一般的mapreduce的wordcount程序如下:
public class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (int i = 0; i < words.length; i++) {
ctx.write(new Text(words[i]), new LongWritable(1L));
}
}
}
public class WcReduer extends Reducer<Text, LongWritable, Text, LongWritable> {
LongWritable count = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context ctx) throws IOException, InterruptedException {
Iterator<LongWritable> itr = values.iterator();
long sum = 0L;
while (itr.hasNext()) {
sum = sum + itr.next().get();
}
count.set(sum);
ctx.write(key, count);
}
}
驱动作业代码:
public class JobClient {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(JobClient.class);
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReduer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setJobName("wordcount");
FileInputFormat.addInputPath(job, new Path("/daxin/hadoop-mapreduce/words"));
FileOutputFormat.setOutputPath(job, new Path("/daxin/hadoop-mapreduce/wordcount-result"));
job.waitForCompletion(true);
}
}
提交作业会报错:
Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.daxin.blog.WcMapper.map(WcMapper.java:20) at com.daxin.blog.WcMapper.map(WcMapper.java:13) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
通过异常信息我们可以定位错误在源码中的位置:org.apache.hadoop.mapred.MapTask.MapOutputBuffer#collect,具体关键源码如下:
public synchronized void collect(K key, V value, final int partition
) throws IOException {
reporter.progress();
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", received "
+ key.getClass().getName());
}
if (value.getClass() != valClass) {
throw new IOException("Type mismatch in value from map: expected "
+ valClass.getName() + ", received "
+ value.getClass().getName());
}
.....
}
此处key.getClass可以确定是Text,需要确定keyClass是什么类型。下面就将确定一下keyClass类型,可以发现keyClass赋值源码:
keyClass = (Class<K>)job.getMapOutputKeyClass();
getMapOutputKeyClass源码:
public Class<?> getMapOutputKeyClass() {
Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
if (retv == null) {
retv = getOutputKeyClass();
}
return retv;
}
其中MAP_OUTPUT_KEY_CLASS则是获取map输出的key的类型,由于我们驱动代码没有设置因此此处得到的值为默认值null,接下在调用getOutputKeyClass方法:
public Class<?> getOutputKeyClass() {
return getClass(JobContext.OUTPUT_KEY_CLASS,
LongWritable.class, Object.class);
}
public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";
通过获取OUTPUT_KEY_CLASS的类型,OUTPUT_KEY_CLASS表示的是作业的key的输出类型,但是由于我们没有设置因此获取默认值为LongWritable。但是实际上我们的MapTask输出的key为Text,因而报如上类型不匹配错误。同理Map的value也有类似问题。为了解决此问题就需要显式的设置MapTask的Key、Value输出类型。代码如下:
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
mapreduce设置setMapOutputKeyClass与setMapOutputValueClass原因
原文:https://www.cnblogs.com/leodaxin/p/8831092.html