首页 > 其他 > 详细

大数据框架hadoop的作业初始化过程(接上编)

时间:2014-12-04 15:03:43      阅读:285      评论:0      收藏:0      [点我收藏+]
??? 本文接上一编文章《大数据框架hadoop的作业提交过程》。

调度器调用JobTracker.initJob()函数对新作业进行初始化。相关代码如下:

//?调度器调用eagerTaskInitializationListener.start()方法。

class?JobQueueTaskScheduler?extends?TaskScheduler?{?

?@Override

??public?synchronized?void?start()?throws?IOException?{

super.start();

...?...

eagerTaskInitializationListener.start();

...?...

}

}

//?EagerTaskInitializationListener.start()方法启动作业管理器线程。

class?EagerTaskInitializationListener?extends?JobInProgressListener?{

??...?...

??public?void?start()?throws?IOException?{

????this.jobInitManagerThread?=?new?Thread(jobInitManager,?"jobInitManager");

????...?...

????this.jobInitManagerThread.start();

??}

??...?...

}

//?作业初始化管理器执行作业初始化动作

class?JobInitManager?implements?Runnable?{

????public?void?run()?{

??????...?...

??????threadPool.execute(new?InitJob(job));

??????...?...

????}

}

作业初始化的主要工作是构造Map?TaskReduce?Task并对它们进行初始化。

Hadoop将每个作业分解成4种类型的任务,分别是Setup?TaskMap?TaskReduce?TaskCleanup?Task。它们的运行时信息由TaskInProgress类维护,因此,创建这些任务实际上是创建TaskInProgress对象。

上述4种任务的作用及创建过程如下。

n?Setup?Task:作业初始化标识性任务。它进行一些非常简单的作业初始化工作,比如将运行状态设置为“setup”,调用OutputCommitter.setupJob()函数等。该任务运行完后,作业由PREP状态变为RUNNING状态,并开始运行Map?Task。该类型任务又被分为Map?Setup?TaskReduce?Setup?Task两种,且每个作业各有一个。它们运行时分别占用一个Map?slotReduce?slot。由于这两种任务功能相同,因此有且只有一个可以获得运行的机会(即只要有一个开始运行,另一个马上被杀掉,而具体哪一个能够运行,取决于当时存在的空闲slot种类及调度策略。相关代码如下:

public?class?JobInProgress?{

??TaskInProgress?setup[]?=?new?TaskInProgress[0];

??...?...

??public?synchronized?void?initTasks()?{

????...?...

????//?create?two?setup?tips,?one?map?and?one?reduce.

????setup?=?new?TaskInProgress[2];

????//?setup?map?tip.?This?map?doesn‘t?use?any?split.?Just?assign?an?empty

????//?split.

????setup[0]?=?new?TaskInProgress(jobId,?jobFile,?emptySplit,?

????????????jobtracker,?conf,?this,?numMapTasks?+?1,?1);

????setup[0].setJobSetupTask();

????//?setup?reduce?tip.

????setup[1]?=?new?TaskInProgress(jobId,?jobFile,?numMapTasks,

???????????????????????numReduceTasks?+?1,?jobtracker,?conf,?this,?1);

setup[1].setJobSetupTask();

...?...

??}

}

?

n?Map?TaskMap阶段处理数据的任务。其数目及对应的处理数据分片由应用程序中的

InputFormat组件确定。关代码如下:

public?class?JobInProgress?{

??TaskInProgress?maps[]?=?new?TaskInProgress[0];

??...?...

??public?synchronized?void?initTasks()?{

????//?read?input?splits?and?create?a?map?per?a?split

TaskSplitMetaInfo[]?splits?=?createSplits(jobId);

numMapTasks?=?splits.length;

????...?...

????maps?=?new?TaskInProgress[numMapTasks];

????for(int?i=0;?i?<?numMapTasks;?++i)?{

??????inputLength?+=?splits[i].getInputDataLength();

??????maps[i]?=?new?TaskInProgress(jobId,?jobFile,?splits[i],?

???????????????????????????????????jobtracker,?conf,?this,?i,?numSlotsPerMap);

}

...?...

??}

}

n?Reduce?TaskReduce阶段处理数据的任务。其数目由用户通过参数mapred.reduce.tasks(默认数目为1)指定。考虑到Reduce?Task能否运行依赖于Map?Task的输出结果,因此,Hadoop刚开始只会调度Map?Task,直到Map?Task完成数目达到一定比例(由参数mapred.reduce.slowstart.completed.maps指定,默认是0.05,即5%)后,才开始调度Reduce?Task。关代码如下:

public?class?JobInProgress?{

??TaskInProgress?reduces[]?=?new?TaskInProgress[0];

??...?...

??public?synchronized?void?initTasks()?{

????...?...

????//?Create?reduce?tasks

????this.reduces?=?new?TaskInProgress[numReduceTasks];

????for?(int?i?=?0;?i?<?numReduceTasks;?i++)?{

??????reduces[i]?=?new?TaskInProgress(jobId,?jobFile,?numMapTasks,?i,?

????????jobtracker,?conf,?this,?numSlotsPerReduce);

??????nonRunningReduces.add(reduces[i]);

}

...?...

}

n?Cleanup?Task:作业结束标志性任务,主要完成一些清理工作,比如删除作业运行过程中用到的一些临时目录(比如_temporary目录)。一旦该任务运行成功后,作业由RUNNING状态变为SUCCESSED状态。关代码如下:

public?class?JobInProgress?{

??TaskInProgress?cleanup[]?=?new?TaskInProgress[0];

??...?...

??public?synchronized?void?initTasks()?{

????...?...

????//?create?cleanup?two?cleanup?tips,?one?map?and?one?reduce.

????cleanup?=?new?TaskInProgress[2];

????//?cleanup?map?tip.?This?map?doesn‘t?use?any?splits.?Just?assign?an?empty

????//?split.

????TaskSplitMetaInfo?emptySplit?=?JobSplit.EMPTY_TASK_SPLIT;

????cleanup[0]?=?new?TaskInProgress(jobId,?jobFile,?emptySplit,?

????????????jobtracker,?conf,?this,?numMapTasks,?1);

????cleanup[0].setJobCleanupTask();

????//?cleanup?reduce?tip.

????cleanup[1]?=?new?TaskInProgress(jobId,?jobFile,?numMapTasks,

???????????????????????numReduceTasks,?jobtracker,?conf,?this,?1);

cleanup[1].setJobCleanupTask();

...?...

}

?

大数据框架hadoop的作业初始化过程(接上编)

原文:http://seandeng888.iteye.com/blog/2162777

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!