完全吊炸天构造器的写法。。。
import os import threading,time class GenericInputData(object): def read(self): raise NotImplementedError @classmethod def generate_inputs(cls,config): raise NotImplementedError class PathInputData(GenericInputData): def __init__(self,path): super(PathInputData, self).__init__() self.path=path def read(self): return open(self.path).read() def get_path_name(self): return self.path """this class method init the Constructor function->__init__() ... """ @classmethod def generate_inputs(cls,config): data_dir = config[‘data_dir‘] #dict elements for name in os.listdir(data_dir): yield cls(os.path.join(data_dir,name)) class GenerateWorker(object): def __init__(self,input_data): self.input_data = input_data self.result = None def map(self): raise NotImplementedError def reduce(self, other): raise NotImplementedError @classmethod def create_workers(cls,input_class,config): workers = [] for input_path_data in input_class.generate_inputs(config): workers.append(cls(input_path_data)) # direct __init__ Constructor function return workers class LineCountWorker(GenerateWorker): """default no __init__ function, then will use the default parent class __init__""" def __init__(self,input_data): super(LineCountWorker, self).__init__(input_data) def map(self): data = self.input_data.read() self.result = data.count("\n") def reduce(self, other): self.result+= other.result def get_worker_name(self): return self.input_data.get_path_name() class Thread_Excute_workers(threading.Thread): def __init__(self,threadId,worker): super(Thread_Excute_workers, self).__init__() self.worker = worker self.th_id = threadId def run(self): self.worker.map() print "Thread ID " + str(self.th_id) + " run " + self.worker.get_worker_name() + ‘\n‘ def excute(workers): threads = [] # create thread elements thread_id = 0 for w in workers: th = Thread_Excute_workers(thread_id,w) th.start() threads.append(th) thread_id = thread_id + 1 for rh_thread in threads: rh_thread.join() # caculate worker of reduce first,rest = workers[0],workers[1:] for rh_work in rest: first.reduce(rh_work) return first.result def mapreduce(worker_class,input_class,config): workers = worker_class.create_workers(input_class,config) return excute(workers) config = {‘data_dir‘: "C:\\data_dir"} result = mapreduce(LineCountWorker,PathInputData,config) print result
原文:http://www.cnblogs.com/gearslogy/p/5327153.html