输入源数据样例:
Source1-0001 Source2-0002 Source1-0003 Source2-0004 Source1-0005 Source2-0006 Source3-0007 Source3-0008描述:
输出要求:
程序实现:
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.mahout.common.AbstractJob;
import com.yhd.common.util.HadoopUtil;
/**
* AbstractJob 是mahout的Job模板,可以不使用该模板,
* 实则的核心部分在于MultipleOutputs部分
*
* @author ouyangyewei
*
*/
public class TestMultipleOutputsJob extends AbstractJob {
@Override
public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
Map<String, List<String>> parseArgs = parseArguments(args);
if(parseArgs==null){
return -1;
}
HadoopUtil.delete(getConf(), getOutputPath());
Configuration conf = new Configuration();
conf.setInt("mapred.reduce.tasks", 4);
conf.set("mapred.job.queue.name", "pms");
conf.set("mapred.child.java.opts", "-Xmx3072m");
conf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.05");
Job job = new Job(new Configuration(conf));
job.setJobName("TestMultipleOutputsJob");
job.setJarByClass(TestMultipleOutputsJob.class);
job.setMapperClass(MultipleMapper.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, this.getInputPath());
FileOutputFormat.setOutputPath(job, this.getOutputPath());
/** 输出文件格式将为:Source1-m-**** */
MultipleOutputs.addNamedOutput(job, "Source1", TextOutputFormat.class, Text.class, Text.class);
/** 输出文件格式将为:Source2-m-**** */
MultipleOutputs.addNamedOutput(job, "Source2", TextOutputFormat.class, Text.class, Text.class);
boolean suceeded = job.waitForCompletion(true);
if(!suceeded) {
return -1;
}
return 0;
}
/**
*
* @author ouyangyewei
*
*/
public static class MultipleMapper extends Mapper<LongWritable, Text, Text, Text> {
private MultipleOutputs<Text, Text> mos = null;
@Override
protected void setup(Context context
) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, Text>(context);
}
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] tokenizer = line.split("-");
if (tokenizer[0].equals("Source1")) {
/** 集合A的数据 */
mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
} else if (tokenizer[0].equals("Source2")) {
/** 集合B的数据 */
mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
}
/** 集合A交集合B的数据 */
if (tokenizer[0].equals("Source3")) {
mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
}
}
protected void cleanup(Context context
) throws IOException, InterruptedException {
mos.close();
}
}
/**
* @param args
*/
public static void main(String[] args) {
System.setProperty("javax.xml.parsers.DocumentBuilderFactory",
"com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
System.setProperty("javax.xml.parsers.SAXParserFactory",
"com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
TestMultipleOutputsJob instance = new TestMultipleOutputsJob();
try {
instance.run(args);
} catch (Exception e) {
e.printStackTrace();
}
}
}hadoop jar bigdata-datamining-1.0-user-trace-jar-with-dependencies.jar com.yhd.datamining.data.usertrack.offline.job.mapred.TestMultipleOutputsJob --input /user/pms/workspace/ouyangyewei/testMultipleOutputs --output /user/pms/workspace/ouyangyewei/testMultipleOutputs/output
[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei] $hadoop fs -ls /user/pms/workspace/ouyangyewei/testMultipleOutputs/output Found 4 items -rw-r--r-- 3 pms pms 65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000 -rw-r--r-- 3 pms pms 65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000 -rw-r--r-- 3 pms pms 0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/_SUCCESS -rw-r--r-- 3 pms pms 0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/part-m-00000 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei] $hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000 Source1 0001 Source1 0003 Source1 0005 Source3 0007 Source3 0008 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei] $hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000 Source2 0002 Source2 0004 Source2 0006 Source3 0007 Source3 0008
MapReduce-MulitipleOutputs实现自定义输出到多个目录
原文:http://blog.csdn.net/yeweiouyang/article/details/41956121