[grid@hadoop1 ~]$ hadoop fs -cat ./in/genealogy.txt Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip Alma Mark Terry Mark Alma
程序:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SelfJoinMapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, new Text("1" + key.toString())); //左表的 parent 做 key
context.write(key, new Text("2" + value.toString())); //右表的 child 做 key
}
}
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SelfJoinReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
List<String> childList = new ArrayList<String>();
List<String> grandList = new ArrayList<String>();
for (Text value : values) {
if (value.toString().startsWith("1")) {
childList.add(value.toString().substring(1));
} else {
grandList.add(value.toString().substring(1));
}
}
for (String child : childList) {
for (String grand : grandList) {
context.write(new Text(child), new Text(grand));
}
}
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SelfJoin {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: SelfJoin <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " "); //设置分割符
//conf.set("mapred.jar", "./out/SelfJoin.jar");
//conf.set("fs.default.name", "hdfs://hadoop1:9000");
//conf.set("mapred.job.tracker", "hadoop1:9001");
Job job = new Job(conf);
job.setInputFormatClass(KeyValueTextInputFormat.class); //设置 InputFormat
job.setJarByClass(SelfJoin.class);
job.setJobName("SelfJoin");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(SelfJoinMapper.class);
job.setReducerClass(SelfJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
查看程序输出如下:
[grid@hadoop1 ~]$ hadoop fs -cat ./out/9/part-r-00000 Tom Alice Tom Jesse Jone Alice Jone Jesse Tom Mary Tom Ben Jone Mary Jone Ben Philip Alice Philip Jesse Mark Alice Mark Jesse
原文:http://my.oschina.net/zc741520/blog/374222