在Hadoop Job的各个运行过程中,Shuffle阶段一直是一个比较神秘的过程.因为Shuffle阶段是隶属于Reduce过程的子过程,所以很多时候会被人所忽略.但是Shffle的整个过程在map reduce的整个过程中起到1个数据过渡的作用.正因为这个模块的重要性,Hadoop把这个模块设置成了可插拔的模块,用户可以根据自己应用的类型特点,定制自己的Shuffle模块代码.之前粗粗的阅读了一下相关的代码,于是写一些内容记录一下所学的.
Shuffle过程是Reduce阶段的初始操作阶段,过程简单的理解就是"远程数据拷贝"的过程,拷贝的目标数据源就是map的中间输出结果.reduce过程要想进一步进行处理操作,首先必须要做的就是拿到这批数据.一般map的中间结果文件是写出在当前的Task运行的节点上,所以reduce task拷贝数据会经过走网络的过程.而且如果这其中的量比较大的话,会消耗掉一定的网络带宽.
下面从源代码层面浅析此模块部分的代码,首先这个阶段是属于Reduce的过程中的,所以定位到Reduce Task的代码上.
@Override @SuppressWarnings("unchecked") public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); if (isMapOrReduce()) { copyPhase = getProgress().addPhase("copy"); sortPhase = getProgress().addPhase("sort"); reducePhase = getProgress().addPhase("reduce"); } .... // Initialize the codec codec = initCodec(); RawKeyValueIterator rIter = null; ShuffleConsumerPlugin shuffleConsumerPlugin = null; Class combinerClass = conf.getCombinerClass(); CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; Class<? extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job); LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter, shuffledMapsCounter, reduceShuffleBytes, failedShuffleCounter, mergedMapOutputsCounter, taskStatus, copyPhase, sortPhase, this, mapOutputFile, localMapFiles); shuffleConsumerPlugin.init(shuffleContext); rIter = shuffleConsumerPlugin.run(); ....在Reduce Task中的run方法中能够看到shuffle部分的代码.首先是狗仔Shuffle上下文,然后是初始化操作,然后执行shuffle主操作.首先来看shuffle的上下文构造过程,他是一个内部类,在构造的过程中,传入了大量的变量参数,这些变量参数在Shuffle的过程中会被用到,下面是context中的变量定义,这些参数会由外部reduce task的参数传入到context上下文类中:
@InterfaceAudience.LimitedPrivate("mapreduce") @InterfaceStability.Unstable public static class Context<K,V> { private final org.apache.hadoop.mapreduce.TaskAttemptID reduceId; private final JobConf jobConf; private final FileSystem localFS; private final TaskUmbilicalProtocol umbilical; private final LocalDirAllocator localDirAllocator; private final Reporter reporter; private final CompressionCodec codec; private final Class<? extends Reducer> combinerClass; private final CombineOutputCollector<K, V> combineCollector; //与Shuffle过程相关计数器 private final Counters.Counter spilledRecordsCounter; private final Counters.Counter reduceCombineInputCounter; private final Counters.Counter shuffledMapsCounter; private final Counters.Counter reduceShuffleBytes; private final Counters.Counter failedShuffleCounter; private final Counters.Counter mergedMapOutputsCounter; private final TaskStatus status; private final Progress copyPhase; private final Progress mergePhase; private final Task reduceTask; private final MapOutputFile mapOutputFile; private final Map<TaskAttemptID, MapOutputFile> localMapFiles;然后构造完成上下文后,进行init操作,就是下面这行代码:
shuffleConsumerPlugin.init(shuffleContext);shuffleConsumerPlugin是接口类,所以要找到具体的实现类,在Hadoop-2.7.1中,是名为Shuffle类.下面是进行的初始化操作
@InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable @SuppressWarnings({"unchecked", "rawtypes"}) public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter { private static final int PROGRESS_FREQUENCY = 2000; private static final int MAX_EVENTS_TO_FETCH = 10000; private static final int MIN_EVENTS_TO_FETCH = 100; private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000; private ShuffleConsumerPlugin.Context context; ..... @Override public void init(ShuffleConsumerPlugin.Context context) { this.context = context; this.reduceId = context.getReduceId(); this.jobConf = context.getJobConf(); this.umbilical = context.getUmbilical(); this.reporter = context.getReporter(); this.metrics = new ShuffleClientMetrics(reduceId, jobConf); this.copyPhase = context.getCopyPhase(); this.taskStatus = context.getStatus(); this.reduceTask = context.getReduceTask(); this.localMapFiles = context.getLocalMapFiles(); scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId, this, copyPhase, context.getShuffledMapsCounter(), context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); merger = createMergeManager(context); }将之前上下文中的参数变量值赋值到自己的内部变量中.还需要关注一下,代码最后一行merge操作类.merge操作会在shuffle阶段尾声阶段进行.下面是执行主方法:
rIter = shuffleConsumerPlugin.run();跳到具体的实现方法中:
@Override public RawKeyValueIterator run() throws IOException, InterruptedException { ...... // Start the map-completion events fetcher thread final EventFetcher<K,V> eventFetcher = new EventFetcher<K,V>(reduceId, umbilical, scheduler, this, maxEventsToFetch); eventFetcher.start(); // Start the map-output fetcher threads boolean isLocal = localMapFiles != null; final int numFetchers = isLocal ? 1 : jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); Fetcher<K,V>[] fetchers = new Fetcher[numFetchers]; if (isLocal) { fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret(), localMapFiles); fetchers[0].start(); } else { for (int i=0; i < numFetchers; ++i) { fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret()); fetchers[i].start(); } } ......在Shuffle的主操作中可以看到,首先会根据map输出结果是否具有本地性,如果是在本地的,传入mapFile文件地址,然后都会新建若干个fetcher线程,来远程抓取数据.所以这里的核心操作应该是在Fetcher类中实现的.在Fetcher的start方法会执行拷贝的主要操作.
public void run() { try { while (!stopped && !Thread.currentThread().isInterrupted()) { MapHost host = null; try { // If merge is on, block merger.waitForResource(); // Get a host to shuffle from host = scheduler.getHost(); metrics.threadBusy(); // Shuffle copyFromHost(host); } finally { if (host != null) { scheduler.freeHost(host); metrics.threadFree(); } } } } catch (InterruptedException ie) { return; } catch (Throwable t) { exceptionReporter.reportException(t); } }进入copyFromHost方法,在方法的其实部分,先获取目标已经结束运行的map task列表.
@VisibleForTesting protected void copyFromHost(MapHost host) throws IOException { // reset retryStartTime for a new host retryStartTime = 0; // Get completed maps on ‘host‘ List<TaskAttemptID> maps = scheduler.getMapsForHost(host); ....也就是说,后面的Shuffle阶段就会从这些map task运行所在的节点上进行fetch data的操作.在拷贝操作之前,维护一个remaining剩余变量操作
// List of maps to be fetched yet Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);接着首先根据变量获取输入流数据,判断是否在map task的host上是否真正存在数据
// Construct the url and connect URL url = getMapOutputURL(host, maps); DataInputStream input = openShuffleUrl(host, remaining, url); if (input == null) { return; }然后后面的操作进行循环的拷贝读取
TaskAttemptID[] failedTasks = null; while (!remaining.isEmpty() && failedTasks == null) { try { failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled); } catch (IOException e) { // // Setup connection again if disconnected by NM connection.disconnect(); // Get map output from remaining tasks only. url = getMapOutputURL(host, remaining); input = openShuffleUrl(host, remaining, url); if (input == null) { return; } } }构造url,获得输入流数据,如果出现失败任务则就会退出循环,正常情况下,remain map数为空了,循环自然会退出.这里的拷贝操作细节又来到了copyMapOutput中.
在拷贝操作之前,会进行拷贝总大小的计算,从输入流中读取.
private TaskAttemptID[] copyMapOutput(MapHost host, DataInputStream input, Set<TaskAttemptID> remaining, boolean canRetry) throws IOException { MapOutput<K,V> mapOutput = null; TaskAttemptID mapId = null; long decompressedLength = -1; long compressedLength = -1; try { long startTime = Time.monotonicNow(); int forReduce = -1; //Read the shuffle header try { ShuffleHeader header = new ShuffleHeader(); header.readFields(input); mapId = TaskAttemptID.forName(header.mapId); compressedLength = header.compressedLength; decompressedLength = header.uncompressedLength; forReduce = header.forReduce; .....然后会根据计算过的拷贝数据量的大小,判断将数据拷贝到内存中还是磁盘中,然后返回相应的输出对象.
// Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { return new TaskAttemptID[] {mapId}; } if(LOG.isDebugEnabled()) { LOG.debug("header: " + mapId + ", len: " + compressedLength + ", decomp len: " + decompressedLength); } // Get the location for the map output - either in-memory or on-disk try { mapOutput = merger.reserve(mapId, decompressedLength, id); ....reserve方法就是主要进行判断操作的
@Override public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, long requestedSize, int fetcher ) throws IOException { if (!canShuffleToMemory(requestedSize)) { LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + maxSingleShuffleLimit + ")"); return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize, jobConf, mapOutputFile, fetcher, true); } ..... if (usedMemory > memoryLimit) { LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory + ") is greater than memoryLimit (" + memoryLimit + ")." + " CommitMemory is (" + commitMemory + ")"); return null; } // Allow the in-memory shuffle to progress LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")." + "CommitMemory is (" + commitMemory + ")"); return unconditionalReserve(mapId, requestedSize, true); }
/** * Unconditional Reserve is used by the Memory-to-Memory thread * @return */ private synchronized InMemoryMapOutput<K, V> unconditionalReserve( TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) { usedMemory += requestedSize; return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize, codec, primaryMapOutput); }返回的对象有2种,1种是拷贝到内存中InMemoryMapOutput,还有1种是磁盘上的,OnDiskMapOutput.目标确定之后,进行Shuffle远程拷贝操作
.... // The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError // on decompression failures. Catching and re-throwing as IOException // to allow fetch failure logic to be processed try { // Go! LOG.info("fetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); mapOutput.shuffle(host, is, compressedLength, decompressedLength, metrics, reporter); } catch (java.lang.InternalError e) { LOG.warn("Failed to shuffle for fetcher#"+id, e); throw new IOException(e); }最后在这些操作完成之后,为了防止内存中的空间被占用过大,或者磁盘中的小文件数太多,会进行一次merge和并操作,在最后的Shuffle类的merge.close()方法中会调用.
// Start the map-output fetcher threads boolean isLocal = localMapFiles != null; final int numFetchers = isLocal ? 1 : jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); Fetcher<K,V>[] fetchers = new Fetcher[numFetchers]; if (isLocal) { fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret(), localMapFiles); fetchers[0].start(); } else { for (int i=0; i < numFetchers; ++i) { fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret()); fetchers[i].start(); } } ..... // Finish the on-going merges... RawKeyValueIterator kvIter = null; try { kvIter = merger.close(); } catch (Throwable e) { throw new ShuffleError("Error while doing final merge " , e); }
@Override public RawKeyValueIterator close() throws Throwable { // Wait for on-going merges to complete if (memToMemMerger != null) { memToMemMerger.close(); } inMemoryMerger.close(); onDiskMerger.close(); List<InMemoryMapOutput<K, V>> memory = new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); memory.addAll(inMemoryMapOutputs); inMemoryMapOutputs.clear(); List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs); onDiskMapOutputs.clear(); return finalMerge(jobConf, rfs, memory, disk); }OK,以上操作的结束,就是整个Reduce Shuffle的过程操作.下面是一张简易的流程分析图
其他方面代码的分析请点击链接https://github.com/linyiqun/hadoop-yarn,后续将会继续更新YARN其他方面的代码分析。
Apach-hadoop-2.7.1(hadoop-hdfs-project)
版权声明:本文为博主原创文章,未经博主允许不得转载。
YARN源码分析(八)-----Reduce Shuffle过程分析
原文:http://blog.csdn.net/androidlushangderen/article/details/48880003