MapTask类
if(useNewApi){ runNewMapper(job, splitMetaInfo, umbilical, reporter); }@SuppressWarnings("unchecked") private<INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException,ClassNotFoundException, InterruptedException{ // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // make the input format org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(newPath(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: "+ split); org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = newNewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; // get an output object if(job.getNumReduceTasks()==0){ output = 如果jreduce个数等于0.则执行该方法 newNewDirectOutputCollector(taskContext, job, umbilical, reporter); }else{ 如果reduce个数大于0.则执行该方法 output =newNewOutputCollector(taskContext, job, umbilical, reporter); } org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = newMapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = newWrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); try{ input.initialize(split, mapperContext); mapper.run(mapperContext); mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); input.close(); input = null; output.close(mapperContext); output = null; } finally { closeQuietly(input); closeQuietly(output, mapperContext); } }// get an output object if(job.getNumReduceTasks()==0){ output = 如果jreduce个数等于0.则执行该方法 newNewDirectOutputCollector(taskContext, job, umbilical, reporter); }else{ 如果reduce个数大于0.则执行该方法 output =newNewOutputCollector(taskContext, job, umbilical, reporter); }NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException,ClassNotFoundException{ collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); if(partitions >1){ partitioner =(org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); }else{ partitioner =new org.apache.hadoop.mapreduce.Partitioner<K,V>(){ @Override publicint getPartition(K key, V value,int numPartitions){ return partitions -1; } }; } }/** * Get the {@link Partitioner} class for the job. * * @return the {@link Partitioner} class for the job. */ publicClass<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException;/** * Get the {@link Partitioner} class for the job. * * @return the {@link Partitioner} class for the job. */ @SuppressWarnings("unchecked") publicClass<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException{ return(Class<? extends Partitioner<?,?>>) conf.getClass(PARTITIONER_CLASS_ATTR,HashPartitioner.class); }publicclassHashPartitioner<K, V>extendsPartitioner<K, V>{/** Use {@link Object#hashCode()} to partition. */publicint getPartition(K key, V value,int numReduceTasks){return(key.hashCode()&Integer.MAX_VALUE)% numReduceTasks;}}@Override publicint hashCode(){ final int prime =31; int result =1; result = prime * result +((account == null)?0: account.hashCode()); // result = prime * result + ((amount == null) ? 0 : amount.hashCode()); return result; }publicstaticclassKeyPartitioner extends Partitioner<SelfKey,DoubleWritable>{ @Override publicint getPartition(SelfKey key,DoubleWritable value,int numPartitions){ /** * 如何保证数据整体输出上的有序,需要我们自定义业务逻辑 * 必须提示前知道num reduce task 个数? * \w 单词字符[a-zA-Z_0-9] * */ String account =key.getAccount(); //0xxaaabbb 0-9 //[0-2][3-6][7-9] if(account.matches("\\w*[0-2]")){ return0; }elseif(account.matches("\\w*[3-6]")){ return1; }elseif(account.matches("\\w*[7-9]")){ return2; } return0; } }原文:http://www.cnblogs.com/xuanlvshu/p/5750405.html