1-MapReduce-计数器
SortMapper.java
package com.mapreduce_sort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
public class SortMapper extends Mapper<LongWritable, Text, PairWritable, Text>{
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
//自定义计数器
Counter counter = context.getCounter("MR_COUNT", "MapReduceCounter");//类型:"MR_COUNT", 变量:"MapReduceCounter"
counter.increment(1L);//1L为每次执行map方法就计数一次信息,L为long类型
//1.对每一行数据进行拆分,然后封装到PairWritable对象中,作为A2
String[] split = value.toString().split("\t");
PairWritable pairWritable = new PairWritable();
pairWritable.setFirst(split[0]);
pairWritable.setSecond(Integer.parseInt(split[1].trim()));
//2.将k2和v2写入上下文中
context.write(pairWritable, value);
}
}
SortReduce.java
package com.mapreduce_sort;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortReducer extends Reducer<PairWritable,Text, PairWritable, NullWritable>{
/**
* a 1 <a 1,a 1>
* a 1
*
*/
//自定义计数器:使用枚举
public static enum MyCounter{//定义了两个计数器
REDUCE_INPUT_KEY_RECORDS,REDUCE_INPUT_VALUE_RECORDS
}
@Override
protected void reduce(PairWritable key, Iterable<Text> values,Context context)throws IOException, InterruptedException {
//统计Reduce阶段key的个数
context.getCounter(MyCounter.REDUCE_INPUT_KEY_RECORDS).increment(1L);
//处理有两个a 1
for (Text value : values) {
//统计Reduce阶段value的个数
context.getCounter(MyCounter.REDUCE_INPUT_VALUE_RECORDS).increment(1L);
//NullWritable.get();.get()表示获取空对象
context.write(key, NullWritable.get());
}
}
}
====================================================================================================================================================
2-MapReduce-Combiner规约-原理分析
============================================================================================================================================
3-MapReduce-Combiner规约-代码实现
WordCountMapper.java
package com.mapreduce_combiner;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Mapper的泛型:
* KEYIN:k1的类型 有偏移量 LongWritable
* VALUEIN:v1的类型 一行的文本数据 Text
* KEYOUT:k2的类型 每个单词 Text
* VALUEOUT:v2的类型 固定值1 LongWritable
*
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
/**
* map方法是将k1和v1转为k2和v2
* key:是k1
* value:是v1
* context:表示MapReduce上下文对象
*/
/**
* k1 v1
* 0 hello,world
* 11 hello,hadoop
* ------------------------------------------
* k2 v2
* hello 1
* world 1
* hadoop 1
*/
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
Text text=new Text();
LongWritable writable = new LongWritable();
//1.对每一行数据进行字符串拆分
String line = value.toString();
String[] split = line.split(",");
//2.遍历数组,获取一个单词
//靠context来连接
for (String word : split) {
text.set(word);
writable.set(1);
context.write(text,writable);
}
}
}
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
WordCountReducer.java
package com.mapreduce_combiner;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* KEYIN:k2 Text 每个单词
* VALUE:v2 LongWritable 集合中泛型的类型
* KEYOUT:k3 Text 每个单词
* VALUEOUT LongWritable 每个单词出现的次数
*/
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
/**
* reduce方法的作用是将k2和v2转为k3和v3
* key:k2
* value:集合
* context:MapReduce的上下文对象
*/
/**
* 新 k2 v2
* hello <1,1>
* world <1,1>
* hadoop <1,1,1>
* -----------------------------
* k3 v3(遍历集合相加)
* hello 2
* world 2
* hadoop 3
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long count=0;
//1.遍历values集合
for (LongWritable value : values) {
//2.将集合中的值相加
count+=value.get();
}
//3:将k3和v3写入上下文中
context.write(key, new LongWritable(count));
}
}
==========================================================================================================
MyCombiner.java
package com.mapreduce_combiner;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//规约(减少网络传输数据量,提高网络传输效率)
public class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long count=0;
//1.遍历values集合
for (LongWritable value : values) {
//2.将集合中的值相加
count+=value.get();
}
//3:将k3和v3写入上下文中
context.write(key, new LongWritable(count));
}
}
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
JobMain.java
package com.mapreduce_combiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool{
@Override
public int run(String[] arg0) throws Exception {
//创建一个任务对象
Job job = Job.getInstance(super.getConf(),"mapreduce_wordcount");
//打包在集群运行时,需要做一个配置
job.setJarByClass(JobMain.class);
//设置任务对象
//第一步:设置读取文件的类:K1和V1(读取原文件TextInputFormat)
job.setInputFormatClass(TextInputFormat.class);
//设置从哪里读
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
//第二步:设置Mapper类
job.setMapperClass(WordCountMapper.class);
//设置Map阶段的输出类型: k2和v2的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//进入Shuffle阶段,采取默认分区,默认排序,默认规约,默认分组
//第三,四,五,六步,采取默认分区,默认排序,默认规约,默认分组
//设置我们的规约类
job.setCombinerClass(MyCombiner.class);
//第七步:设置Reducer类
job.setReducerClass(WordCountReducer.class);
//设置reduce阶段的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//第八步: 设置输出类
job.setOutputFormatClass(TextOutputFormat.class);
//设置输出的路径
//注意:wordcount_out这个文件夹一定不能存在
TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/wordcount_combiner"));
boolean b= job.waitForCompletion(true);//固定写法;一旦任务提交,处于等待任务完成状态
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//启动一个任务
//返回值0:执行成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.out.println(run);
}
}
加入规约代码后执行截图:
原文:https://www.cnblogs.com/curedfisher/p/12603200.html