首页 > 编程语言 > 详细

Effective_Python mapreduce

时间:2016-03-27 23:51:27      阅读:316      评论:0      收藏:0      [点我收藏+]

完全吊炸天构造器的写法。。。

import os
import threading,time
class GenericInputData(object):
    def read(self):
        raise NotImplementedError
    @classmethod
    def generate_inputs(cls,config):
        raise NotImplementedError
    
class PathInputData(GenericInputData):
    def __init__(self,path):
        super(PathInputData, self).__init__()
        self.path=path
    def read(self):
        return open(self.path).read()
    def get_path_name(self):
        return self.path


    """this class method init the Constructor function->__init__() ... """
    @classmethod
    def generate_inputs(cls,config):
        data_dir = config[data_dir] #dict elements
        for name in os.listdir(data_dir):
            yield cls(os.path.join(data_dir,name))

class GenerateWorker(object):
    def __init__(self,input_data):
        self.input_data = input_data
        self.result = None
    def map(self):
        raise NotImplementedError
    def reduce(self, other):
        raise NotImplementedError
    @classmethod
    def create_workers(cls,input_class,config):
        workers = []
        for input_path_data in input_class.generate_inputs(config):
            workers.append(cls(input_path_data))  # direct __init__ Constructor function
        return workers


class LineCountWorker(GenerateWorker):
    """default no __init__ function, then will use the default parent class __init__"""
    def __init__(self,input_data):
        super(LineCountWorker, self).__init__(input_data)
    def map(self):
        data = self.input_data.read()
        self.result = data.count("\n")
    def reduce(self, other):
        self.result+= other.result
    def get_worker_name(self):
        return self.input_data.get_path_name()


class Thread_Excute_workers(threading.Thread):
    def __init__(self,threadId,worker):
        super(Thread_Excute_workers, self).__init__()
        self.worker = worker
        self.th_id = threadId
    def run(self):
        self.worker.map()
        print "Thread ID " + str(self.th_id) + " run " + self.worker.get_worker_name() + \n

def excute(workers):
    threads = [] # create thread elements
    thread_id = 0
    for w in workers:
        th = Thread_Excute_workers(thread_id,w)
        th.start()
        threads.append(th)
        thread_id = thread_id + 1
    for rh_thread in threads:
        rh_thread.join()

    # caculate worker of reduce
    first,rest = workers[0],workers[1:]
    for rh_work in rest:
        first.reduce(rh_work)
    return first.result

def mapreduce(worker_class,input_class,config):
    workers = worker_class.create_workers(input_class,config)
    return excute(workers)


config = {data_dir: "C:\\data_dir"}
result = mapreduce(LineCountWorker,PathInputData,config)
print result

 

Effective_Python mapreduce

原文:http://www.cnblogs.com/gearslogy/p/5327153.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!