调度器调用JobTracker.initJob()函数对新作业进行初始化。相关代码如下:
//?调度器调用eagerTaskInitializationListener.start()方法。 class?JobQueueTaskScheduler?extends?TaskScheduler?{? ?@Override ??public?synchronized?void?start()?throws?IOException?{ super.start(); ...?... eagerTaskInitializationListener.start(); ...?... } } //?EagerTaskInitializationListener.start()方法启动作业管理器线程。 class?EagerTaskInitializationListener?extends?JobInProgressListener?{ ??...?... ??public?void?start()?throws?IOException?{ ????this.jobInitManagerThread?=?new?Thread(jobInitManager,?"jobInitManager"); ????...?... ????this.jobInitManagerThread.start(); ??} ??...?... } //?作业初始化管理器执行作业初始化动作。 class?JobInitManager?implements?Runnable?{ ????public?void?run()?{ ??????...?... ??????threadPool.execute(new?InitJob(job)); ??????...?... ????} } |
作业初始化的主要工作是构造Map?Task和Reduce?Task并对它们进行初始化。
Hadoop将每个作业分解成4种类型的任务,分别是Setup?Task、Map?Task、Reduce?Task和Cleanup?Task。它们的运行时信息由TaskInProgress类维护,因此,创建这些任务实际上是创建TaskInProgress对象。
上述4种任务的作用及创建过程如下。
n?Setup?Task:作业初始化标识性任务。它进行一些非常简单的作业初始化工作,比如将运行状态设置为“setup”,调用OutputCommitter.setupJob()函数等。该任务运行完后,作业由PREP状态变为RUNNING状态,并开始运行Map?Task。该类型任务又被分为Map?Setup?Task和Reduce?Setup?Task两种,且每个作业各有一个。它们运行时分别占用一个Map?slot和Reduce?slot。由于这两种任务功能相同,因此有且只有一个可以获得运行的机会(即只要有一个开始运行,另一个马上被杀掉,而具体哪一个能够运行,取决于当时存在的空闲slot种类及调度策略。相关代码如下:
public?class?JobInProgress?{ ??TaskInProgress?setup[]?=?new?TaskInProgress[0]; ??...?... ??public?synchronized?void?initTasks()?{ ????...?... ????//?create?two?setup?tips,?one?map?and?one?reduce. ????setup?=?new?TaskInProgress[2]; ????//?setup?map?tip.?This?map?doesn‘t?use?any?split.?Just?assign?an?empty ????//?split. ????setup[0]?=?new?TaskInProgress(jobId,?jobFile,?emptySplit,? ????????????jobtracker,?conf,?this,?numMapTasks?+?1,?1); ????setup[0].setJobSetupTask(); ????//?setup?reduce?tip. ????setup[1]?=?new?TaskInProgress(jobId,?jobFile,?numMapTasks, ???????????????????????numReduceTasks?+?1,?jobtracker,?conf,?this,?1); setup[1].setJobSetupTask(); ...?... ??} } |
?
n?Map?Task:Map阶段处理数据的任务。其数目及对应的处理数据分片由应用程序中的
InputFormat组件确定。关代码如下:
public?class?JobInProgress?{ ??TaskInProgress?maps[]?=?new?TaskInProgress[0]; ??...?... ??public?synchronized?void?initTasks()?{ ????//?read?input?splits?and?create?a?map?per?a?split TaskSplitMetaInfo[]?splits?=?createSplits(jobId); numMapTasks?=?splits.length; ????...?... ????maps?=?new?TaskInProgress[numMapTasks]; ????for(int?i=0;?i?<?numMapTasks;?++i)?{ ??????inputLength?+=?splits[i].getInputDataLength(); ??????maps[i]?=?new?TaskInProgress(jobId,?jobFile,?splits[i],? ???????????????????????????????????jobtracker,?conf,?this,?i,?numSlotsPerMap); } ...?... ??} } |
n?Reduce?Task:Reduce阶段处理数据的任务。其数目由用户通过参数mapred.reduce.tasks(默认数目为1)指定。考虑到Reduce?Task能否运行依赖于Map?Task的输出结果,因此,Hadoop刚开始只会调度Map?Task,直到Map?Task完成数目达到一定比例(由参数mapred.reduce.slowstart.completed.maps指定,默认是0.05,即5%)后,才开始调度Reduce?Task。关代码如下:
public?class?JobInProgress?{ ??TaskInProgress?reduces[]?=?new?TaskInProgress[0]; ??...?... ??public?synchronized?void?initTasks()?{ ????...?... ????//?Create?reduce?tasks ????this.reduces?=?new?TaskInProgress[numReduceTasks]; ????for?(int?i?=?0;?i?<?numReduceTasks;?i++)?{ ??????reduces[i]?=?new?TaskInProgress(jobId,?jobFile,?numMapTasks,?i,? ????????jobtracker,?conf,?this,?numSlotsPerReduce); ??????nonRunningReduces.add(reduces[i]); } ...?... } |
n?Cleanup?Task:作业结束标志性任务,主要完成一些清理工作,比如删除作业运行过程中用到的一些临时目录(比如_temporary目录)。一旦该任务运行成功后,作业由RUNNING状态变为SUCCESSED状态。关代码如下:
public?class?JobInProgress?{ ??TaskInProgress?cleanup[]?=?new?TaskInProgress[0]; ??...?... ??public?synchronized?void?initTasks()?{ ????...?... ????//?create?cleanup?two?cleanup?tips,?one?map?and?one?reduce. ????cleanup?=?new?TaskInProgress[2]; ????//?cleanup?map?tip.?This?map?doesn‘t?use?any?splits.?Just?assign?an?empty ????//?split. ????TaskSplitMetaInfo?emptySplit?=?JobSplit.EMPTY_TASK_SPLIT; ????cleanup[0]?=?new?TaskInProgress(jobId,?jobFile,?emptySplit,? ????????????jobtracker,?conf,?this,?numMapTasks,?1); ????cleanup[0].setJobCleanupTask(); ????//?cleanup?reduce?tip. ????cleanup[1]?=?new?TaskInProgress(jobId,?jobFile,?numMapTasks, ???????????????????????numReduceTasks,?jobtracker,?conf,?this,?1); cleanup[1].setJobCleanupTask(); ...?... } |
?
原文:http://seandeng888.iteye.com/blog/2162777