案例一:单词统计
public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); //set mapper`s property job.setMapperClass(WCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input/words.txt")); //set reducer`s property job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output")); //submit job.waitForCompletion(true); } } class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long counter = 0; for(LongWritable l : values){ counter += l.get(); } context.write(key, new LongWritable(counter)); } } class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for(String w : words){ context.write(new Text(w), new LongWritable(1)); } } }
案例二:流量统计
public class DataCount { public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String tel = fields[1]; long up = Long.parseLong(fields[8]); long down = Long.parseLong(fields[9]); DataBean bean = new DataBean(tel, up, down); context.write(new Text(tel), bean); } } public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{ @Override protected void reduce(Text key, Iterable<DataBean> values, Context context) throws IOException, InterruptedException { long up_sum = 0; long down_sum = 0; for(DataBean bean : values){ up_sum += bean.getUpPayLoad(); down_sum += bean.getDownPayLoad(); } DataBean bean = new DataBean("", up_sum, down_sum); context.write(key, bean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(DataCount.class); job.setMapperClass(DCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DataBean.class); FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input/flowData.txt")); job.setReducerClass(DCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataBean.class); FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output")); job.waitForCompletion(true); } } class DataBean implements Writable{ private String tel; private long upPayLoad; private long downPayLoad; private long totalPayLoad; public DataBean(){} public DataBean(String tel, long upPayLoad, long downPayLoad) { super(); this.tel = tel; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; this.totalPayLoad = upPayLoad + downPayLoad; } public String toString() { return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad; } // notice : 1 type 2 order public void write(DataOutput out) throws IOException { out.writeUTF(tel); out.writeLong(upPayLoad); out.writeLong(downPayLoad); out.writeLong(totalPayLoad); } public void readFields(DataInput in) throws IOException { this.tel = in.readUTF(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); this.totalPayLoad = in.readLong(); } public String getTel() { return tel; } public void setTel(String tel) { this.tel = tel; } public long getUpPayLoad() { return upPayLoad; } public void setUpPayLoad(long upPayLoad) { this.upPayLoad = upPayLoad; } public long getDownPayLoad() { return downPayLoad; } public void setDownPayLoad(long downPayLoad) { this.downPayLoad = downPayLoad; } public long getTotalPayLoad() { return totalPayLoad; } public void setTotalPayLoad(long totalPayLoad) { this.totalPayLoad = totalPayLoad; } }
原文:http://www.cnblogs.com/fengmingyue/p/6354490.html