首页 > 移动平台 > 详细

【转】ChainMapper 实例理解

时间:2015-12-24 19:23:12      阅读:234      评论:0      收藏:0      [点我收藏+]

通过ChainMapper可以将多个map类合并成一个map任务。

下面个这个例子没什么实际意思,但是很好的演示了ChainMapper的作用。

源文件
100 tom 90
101 mary 85
102 kate 60

map00的结果,过滤掉100的记录
101 mary 85
102 kate 60

map01的结果,过滤掉101的记录
102 kate 60

reduce结果
102 kate 60

import java.io.IOException;
import java.util.*;
import java.lang.String;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*;

public class WordCount
{

    public static class Map00 extends MapReduceBase implements Mapper
    {

        public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
        {

            Text ft = new Text(“100″);

            if(!key.equals(ft))
            {
                output.collect(key, value);
            }
        }
    }

    public static class Map01 extends MapReduceBase implements Mapper
    {

        public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
        {

            Text ft = new Text(“101″);

            if(!key.equals(ft))
            {
                output.collect(key, value);
            }
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer
    {
        public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
        {

            while(values.hasNext())
            {
                output.collect(key, values.next());
            }

        }
    }

    public static void main(String[] args) throws Exception
    {

        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName(“wordcount00″);

        conf.setInputFormat(KeyValueTextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        ChainMapper cm = new ChainMapper();

        JobConf mapAConf = new JobConf(false);
        cm.addMapper(conf, Map00.class, Text.class, Text.class, Text.class, Text.class, true, mapAConf);

        JobConf mapBConf = new JobConf(false);
        cm.addMapper(conf, Map01.class, Text.class, Text.class, Text.class, Text.class, true, mapBConf);

        conf.setReducerClass(Reduce.class);

        conf00.setOutputKeyClass(Text.class);
        conf00.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);

    }
}

 

总结:

  1.一句话:ChainMapper即在Reduce之前进行多次Mapper

  2.ChainMapper必须保证所有的子mapper输入输出是一致的!

【转】ChainMapper 实例理解

原文:http://www.cnblogs.com/not-NULL/p/5073926.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!