主要内容
1. Task执行原理流程图
2. Task执行源码
3. Task执行结果在Driver端的处理
一、Task在Executor(worker)端执行及返回Driver流程图
图37-1 Driver端与Executor交互图
当Driver中的SchedulerBackend给ExecutorBackend发送LaunchTask之后,ExecutorBackend在接收到LaunchTask消息后,首先反序列化TaskDescription。
& StandAlone下为SchedulerBackend具体指CoarseGrainedSchedulerBackend,ExecutorBackend指CoarseGrainedExecutorBackend。
//CoarseGrainedExecutorBackend#receive
case LaunchTask(data) =>
if (executor == null) {
//如果不存在Executor则会报错,退出系统
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
//反序列化Task,得到TaskDescription信息
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
//调用executor#launchTask在executor上加载任务
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
Executor会通过launchTask来执行Task。
//Executor#launchTask
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
//实例化一个TaskRunner对象来执行Task
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
//将Task加入到正在运行的Task队列
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
class TaskRunner(
execBackend: ExecutorBackend,
val taskId: Long,
val attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer)
extends Runnable {//省略非关键代码
override def run(): Unit = {
//为我们的Task创建内存管理器
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
//记录反序列化时间
val deserializeStartTime = System.currentTimeMillis()
//加载具体类时需要用到ClassLoader
Thread.currentThread.setContextClassLoader(replClassLoader)
//创建序列化器
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
//调用ExecutorBackend#statusUpdate向Driver发信息汇报当前状态
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
//记录运行时间和GC信息
var taskStart: Long = 0
startGCTime = computeTotalGcTime()
try {
//反序列化Task的依赖,得到的结果中有taskFile(运行的文件),taskJar(环境依
//赖),taskBytes(相当于缓冲池)
val (taskFiles, taskJars, taskBytes) =
Task.deserializeWithDependencies(serializedTask)
//下载Task运行缺少的依赖。
updateDependencies(taskFiles, taskJars)
//反序列化Task
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
//设置Task运行时的MemoryManager
task.setTaskMemoryManager(taskMemoryManager)
//如果Task在序列化前就已经被killed,则会抛出异常;否则,正常执行
if (killed) {
throw new TaskKilledException
}
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
//运行的实际任务,并测量它的运行时间。
taskStart = System.currentTimeMillis()
var threwException = true
val (value, accumUpdates) = try {
//调用task#run方法,得到task运行的结果
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
} finally {
//清理所有分配的内存和分页,并检测是否有内存泄漏
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
}
//记录Task完成时间
val taskFinish = System.currentTimeMillis()
//如果Task killed,则报错。
if (task.killed) {
throw new TaskKilledException
}
//否则序列化得到的Task执行的结果
val resultSer = env.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
//记录相关的metrics
for (m <- task.metrics) {
m.setExecutorDeserializeTime(
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
m.updateAccumulators()
}
//创建直接返回给Driver的结果对象DirectTaskResult
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
val serializedResult: ByteBuffer = {
//对直接返回的结果对象大小进行判断
if (maxResultSize > 0 && resultSize > maxResultSize) {
//大于最大限制1G,直接丢弃ResultTask
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
//结果大小大于设定的阀值,则放入BlockManager中
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
//返回非直接返回给Driver的对象TaskResultTask
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
//结果不大,直接传回给Driver
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}
//通知Driver Task已完成
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} //省略备份代码
finally {//将Task从运行队列中去除
runningTasks.remove(taskId)
}
}
Executor会通过TaskRunner在ThreadPool中来运行具体的Task;在TaskRunner的run方法中会通过调用statusUpdate方法给Driver发信息汇报自己的状态。告诉Driver,Task已经开始运行了(Running状态)。
TaskRunner内部会有一些准备工作,例如反序列化Task的依赖,然后通过网络来获取需要的文件、Jar等信息。
& 补充:在执行具体的Task的业务逻辑前会进行四次反序列化:1.TaskDescription反序列化 2,Task的反序列化 3,RDD的反序列化 4,反序列化依赖
调用反序列化后的Task的run方法来获得任务执行的结果;
final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem)
: (T, AccumulatorUpdates) = {
//创建Task执行的上下文
context = new TaskContextImpl(
stageId,
partitionId,
taskAttemptId,
attemptNumber,
taskMemoryManager,
metricsSystem,
internalAccumulators,
runningLocally = false)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
try{ //调用runTask执行Task
(runTask(context), context.collectAccumulators())
}//省略部分代码
}
}
不同Task类型对抽象方法runTask的实现不同,ShuffleMapTask#runTask方法会调用RDD的iterator()方法来计算Task;事实上,其内部会迭代Partition的元素并利用我们自定的function来进行计算。ResultTask计算过程与之类似。不同的是ShuffleMapTask#runTask在计算具体的partition后实际上会通过shuffleManager获取shufflewriter把当前Task的计算结果根据具体的shuffleManager的实现来写入到具体的文件中,操作结束后会把MapStatus发送给DAGScheduler;而ResultTask#runTask会根据前面Stage的执行结果进行Shuffle产生整个Job最后的结果。
& 对于ShuffleMapTask,首先需要对RDD以及其依赖关系进行反序列化;最终计算(不考虑cache和checkpoint)时,会调用RDD#compute方法
& def compute(split: Partition, context: TaskContext): Iterator[T]
& 具体计算时有具体的RDD,例如MapPartitionsRDD的compute;
& override def
compute(split:Partition, context: TaskContext):
Iterator[U] =
f(context, split.index, firstParent[T].iterator(split,context))
& 这里的f就是我们在当前的Stage中计算具体Partition的业务逻辑代码;来源自对我们在该Stage的各个算子中自定义的函数的合并。
并根据序列化后的DirectResultTask的大小选择不同的方式将结果传回给Driver端。
& 若果结果大于1G(可以通过spark.driver.maxResultSize来进行设置),直接丢弃
& 如果结果“较大”(小于1G但大于一个阀值(akkaFrameSize-AkkaUtils.reservedSizeBytes),在Spark1.6中akkaFrameSize默认为128MB,此时阀值为conf.getInt("spark.akka.frameSize",128)*1024*1024-200*1024),则会放入BlockManager中。(env.blockManager.putBytes(
blockId,serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER))
& 如果结果不大,则直接传回给Driver。
调用ExecutorBackend#statusUpdate方法给Driver发信息汇报自己的状态。告诉Driver,Task已经开始完成了(FINISHED状态)。
由上一节,可以看出Task在Executor执行完成时,会通过向Driver发送StatusUpdate的消息来通知Driver任务的状态更新为TaskState.FINISHED。
//ExecutorBackend(CoarseGrainedBackend)#stateUpdate
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
val msg = StatusUpdate(executorId, taskId, state, data)
driver match {
//通知Driver
case Some(driverRef) => driverRef.send(msg)
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}
//SchedulerBackend(CoarseGrainedBackend).DriverEndpoint#receive
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value) //1
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
//增加这个Executor的可用CPU数
executorInfo.freeCores += scheduler.CPUS_PER_TASK
//重新为这个Executor分配资源
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
Driver首先会将任务的状态更新通知给TaskScheduler,然后会在这个Executor上重新分配新的计算任务。(见1)
//TaskSchedulerImpl#statusUpdate (1处被调用)
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
if (TaskState.isFinished(state)) {
//如果Task的状态是FINISHED或者FAILED或者KILLED或者LOST
//都是为Task执行结束,清理本地的Task数据结构
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
}
}
if (state == TaskState.FINISHED) {
//任务完成TaskSetManager标记该任务已经结束,此时Task不一定成功结束
taskSet.removeRunningTask(tid)
//处理任务的计算结果
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) //2
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
//从TaskSetManager的运行Task队列中去除标记为完成的Task,此时Task不一定
//是成功执行结束。
taskSet.removeRunningTask(tid)
//处理失败的情况
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)//3
}
}
& 执行结束状态包括了FINISHED, FAILED, KILLED, LOST 这四种状态,所以标记为执行结束的Task并非是成功执行Task结束的。val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)
& 这里Task的状态只有是FINISHED的时候才是成功执行Task结束的标志,其余的状态例如:FAILED、KILLED和LOST都是Task执行失败的标志。
/**
* 利用一个线程池来反序列化Task执行结果或者在必要是抓取Task结果。
*/
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
extends Logging {
//设置线程池内线程数,可配置通过spark.resultGetter.threads
private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool(
THREADS, "task-result-getter")
//设置序列化器
protected val serializer = new ThreadLocal[SerializerInstance] {
override def initialValue(): SerializerInstance = {
sparkEnv.closureSerializer.newInstance()
}
}
//定义Task成功执行得到的结果的处理逻辑,?处被调用
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
//通过线程池来获取Task执行的结果
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
//如果是直接发送到Driver端的Task执行结果,未利用BlockManager即Executor
//发送Task的最后一种情况,考参照Executor端执行步骤9,判断传回Driver的方
//式
case directResult: DirectTaskResult[_] =>
//不符合抓取Task的大小限制
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
//这里反序列化的值是不加锁的,这主要是为了保证多线程对其访问时,不会出现
//其他线程因为本线程而堵塞的情况,这里我们应该先调用它,获得反序列化的
//值,以便在TaskSetManager#handleSuccessfulTask中再次调用时,不需要再次
//反序列化该值
directResult.value()
//得到Task执行的结果,由于是directResult,所以不需要远程读取。
(directResult, serializedData.limit())
//如果Executor返回给Driver的Task执行结果是间接的,需要借助BlockManager
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
// 如果结果大小比maxResultSize,则在远程节点上(worker)删除该
//blockManager
sparkEnv.blockManager.master.removeBlock(blockId)
return
}
//需要从远程节点上抓取Task执行的结果
logDebug("Fetching indirect task result for TID %s".format(tid))
//标记Task为需要远程抓取的Task并通知DAGScheduler
scheduler.handleTaskGettingResult(taskSetManager, tid)
//从远程节点的BlockManager上获取Task计算结果
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
if (!serializedTaskResult.isDefined) {
//在Task执行结束获得结果后到driver远程去抓取结果之间,如果运行task的
//机器挂掉,或者该机器的BlockManager已经刷新掉了Task执行结果,都会导致
//远程抓取结果失败,即结果丢失。
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
return
}
//远程抓取结果成功,反序列化结果
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
//删除远程的结果
sparkEnv.blockManager.master.removeBlock(blockId)
//得到IndirectResult类型的结果
(deserializedResult, size)
}
result.metrics.setResultSize(size)
//标记Task为SuccessfulTask并通知DAGScheduler
scheduler.handleSuccessfulTask(taskSetManager, tid, result) //4
} catch {//省略部分非关键代码
}
}
})
}
}
& 这里TaskScheduler获得方式结果主要是依据Driver端得到Executor端返回的Task运行结果确定的,有两种方式1)DirectResult,2)InDirectResult。对于1)直接可以反序列化Driver端接到的返回信息得到Task运行结果;对于2)则需要借助远程节点(worker)上的BlockManager来远程获取结果。
//4处被调用
def handleSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]): Unit = synchronized {
taskSetManager.handleSuccessfulTask(tid, taskResult)
}
//TaskSetManager#hadleSuccessfulTask
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
//向高层调度器报告结果 5
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
//判断该TaskSet中Task是否已全部执行成功
if (!successful(index)) {//该Task还未标记为成功执行
//增加执行成功的Task
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
// 标记执行成功的Task,如果TaskSet中的所有Task执行成功则停止该TaskSetManager
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
} else {
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
//从执行失败的集合中删去该Task,用于Task失败重试
failedExecutors.remove(index)
//判断TaskSet中Task是否已全部执行完成,是则说明该TaskSet已执行完成,相应的对该
//TaskSetManager的调度结束,从调度池中删除该TaskSetManager
maybeFinishTaskSet()
}
//DAGScheduler#taskEnd 5处被调用
def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics): Unit = {
//加入DAGScheduler的消息队列,等待处理
eventProcessLoop.post(
CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
}
//DAGScheduler#doOnReceive
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
(1)处理ShuffleMapTask
对于ShuffleMapTask,首先需要将结果保存到Stage,如果当前Stage所有Task都结束了,则将所有的结果注册到MapOutputTrackerMaster;这样下一个Stage的Task就可以通过他来获取Shuffle的结果原数据信息,进而从Shuffle数据所在的节点获取数据了。
//DAGScheduler#handleTaskCompletion
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,event.reason, event.taskInfo, event.taskMetrics))
//从该stage中等待处理的partition中去除Task对应的partition
stage.pendingPartitions -= task.partitionId
task match {
//如果是ShuffleMapTask
case smt: ShuffleMapTask =>
//实例化一个shuffleStage实例,用来保存TaskSet结果
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
//跟新本地的状态信息
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
//忽略在集群中游走的ShuffleMapTask(来自一个失效的节点的Task结果)。
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
//将结果保存到Stage中。即将Task结果的输出位置放到Stage的数据结构中。
shuffleStage.addOutputLoc(smt.partitionId, status)
}
//如果当前Stage运行完毕
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
//标记当前Stage为Finished,并将其从运行中Stage列表中删除
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
//将整体结果注册到MapOutputTrackerMaster;
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
//清除本地缓存
clearCacheLocs()
//如果shuffleMapStage中有一些tasks运行失败,没有结果。
if (!shuffleStage.isAvailable) {
//则需要重新提交这个shuffleMapStage,并且需要告知顶层调度器TaskScheduler
//进行处理。
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
} else {
// 标记所有等待这个Stage结束的Map-Stage Job为结束状态
//这里会将这个Job记为Finished状态,并统计输出结果,报告给监听器
if (shuffleStage.mapStageJobs.nonEmpty) {
val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
for (job <- shuffleStage.mapStageJobs) {
markMapStageJobAsFinished(job, stats)
}
}
}
& 从ActiveJob类的注释可以看出,Job可以有两种类型:result job,这会触发ResultStage执行的action操作,或Map-stage Job,在任何下游Stage提交之前计算出其所需的前一个Stage的结果,并对ShuffleMapStage的结果进行映射。后者用于自适应查询计划,用于在提交后期stage之前可以查看上有Stage输出结果的统计信息(如一些结果位置的元数据信息)。我们使用这个类的finalStage字段来对两种类型的Job进行区分。对于Map-Stage会借助MapOutputTracker来映射上游的Stage的Task输出信息,来实现前一个Stage输出信息的位置等元信息传递给后一个Stage的过程;并直接标记Map-Stage Job结束,并报告输出统计信息给监听器。
& ActiveJob类中会记录Job所需计算的分片(Partition)数目,以及每个Partition是否计算完成。由于Task与Partition是一一对应的,所以我们从这个类中可以知道有多少个Task,与Task执行完成的个数。
(2)处理ResultTask
首先,MapOutputTracker会把ShuffleMapTask执行结果交给ResultTask,然后,ResultTask会根据前面Stage的执行结果进行Shuffle产生整个Job最后的结果。
case rt: ResultTask[_, _] =>
实例化一个ResultStage来存储resultTask
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
//如果这个Task未执行完成
if (!job.finished(rt.outputId)) {
//更新当前状态
updateAccumulators(event)
//将Task所对应的Partition标记为计算完成
job.finished(rt.outputId) = true
//当前作业中Partition完成数增加
job.numFinished += 1
// 如果当前的Job所有Partition对已计算完成,就将这个Stage remove掉
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}
//在处理SucceedTask时,会调用一些用户定义的函数,可能会产生异常,为
//了确保程序的健壮性,需要进行异常处理。
try {
job.listener.taskSucceeded(rt.outputId, event.result)//⑥
} catch {
case e: Exception =>
//当异常发生时,有时需要标记ResultStage失败。
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
}
case None =>
//在任务进行推测执行时,可能有多个Task的执行结果,对于对于的结果,系统
//会进行忽略处理。
logInfo("Ignoring result from " + rt + " because its job has finished")
//⑥处被调用 JobListener.scala
private[spark] trait JobListener {
//对Task执行结果进行处理的核心逻辑
def taskSucceeded(index: Int, result: Any) //⑦
//对Task执行失败进行处理的而核心逻辑。
def jobFailed(exception: Exception)
}
//对父类trait JobListener中的抽象方法的具体实现
//JobWaiter#taskSucceed 对⑦抽象方法的实现
override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
//如果当前Job处理已完成,说明Task进行了重复处理,则会报错。
if (_jobFinished) {
throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
}
//调用用户逻辑,即用户定义的处理方法,来处理Task的结果
resultHandler(index, result.asInstanceOf[T])
//记录当前Job的Task完成数增加
finishedTasks += 1
//如果当前Job的所有Task都已执行完毕,则表明整个Job完成。
if (finishedTasks == totalTasks) {
_jobFinished = true
jobResult = JobSucceeded
//通知所有调用JobWaiter#awaitResult的方法,Job执行完成,可以继续运行了。
this.notifyAll()
}
}
& 在DAGScheduler#JobSubmit中,会得到JobWaiter类的实例waiter,从而获得Job的执行结果。最终在DAGScheduler#runJob中,调用waiter#awaitResult,对JobSuceeded进行报告,并写入日志。Job执行就结束了。
对于出错或是执行失败的Task,TaskSchedulerImpl#statsUpdate会调用TaskResultGetter#enqueueFailedTask来处理。这个处理过程与执行成功的Task的处理过程是类似的,它们(执行成功和执行失败的Task)会是公用一个线程池来执行处理逻辑。
// TaskResultGetter# enqueueFailedTask定义Task执行失败的处理逻辑,3处被调用
//这部分可以理解为Scheduler的容错功能。
def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
serializedData: ByteBuffer) {
//记录执行失败的原因
var reason : TaskEndReason = UnknownReason
try {
//调用线程池的一个线程来处理。
getTaskResultExecutor.execute(new Runnable { //具体处理逻辑
override def run(): Unit = Utils.logUncaughtExceptions {
val loader = Utils.getContextOrSparkClassLoader
try {
//如果是序列化结果为空或是序列化结果大于规定值,则是序列化失败导致Task执行
//失败。
if (serializedData != null && serializedData.limit() > 0) {
reason = serializer.get().deserialize[TaskEndReason](
serializedData, loader)
}
} catch {//序列化过程中抛出异常。
case cnd: ClassNotFoundException =>
//由于Task执行失败并非致命性错误,所以这里只需将信息记录到日志里之后,仍然
//可以继续执行程序
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
case ex: Exception => {}
}
//调用TaskSchulerImpl#handleFailedTask来处理Task失败,该方法中定义了处理
//Task失败的核心逻辑。
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) //⑧
}
})
} catch {//处理SparkContext已关闭异常
case e: RejectedExecutionException if sparkEnv.isStopped =>
// ignore it
}
}
//TaskScheduler#hadleFailTask ⑧处调用
def handleFailedTask(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState,
reason: TaskEndReason): Unit = synchronized {
//调用TaskSetManager处理失败的情况。
taskSetManager.handleFailedTask(tid, taskState, reason) //⑨
if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {
//这里需要重新进行资源调度来执行失败的Task,而失败的Task的状态(例如执行失败次数
//等)已由TaskManager进行了更新,来反应该任务是失败后重新执行的Task
backend.reviveOffers()
}
}
TaskSetManager#handlerFailTask方法主要是将任务标记为失败,并将它重新添加到待处理任务列表,同时通知高层调度器DAG Scheduler。
//TaskSetManager#handlerFailTask ⑨处调用
def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {
//省略部分非关键代码,这些代码主要处理一些出错信息,并根据不同的出错信息做一些日志记
//录操作。
//如果Executor failed,则尝试重新加入这些Executor。
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
put(info.executorId, clock.getTimeMillis())
//调用高层调度器(DAGScheduler)进行容错 //⑩
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
//将Task加入到待处理任务列表
addPendingTask(index)
if (!isZombie && state != TaskState.KILLED
&& reason.isInstanceOf[TaskFailedReason]
//这里的countTowardTaskFailures指,是否允许在Stage被丢弃前,执行最大次数的
//Task失败重试。只有当Task的执行失败与Task本身无关时,才会设置为false(例
//如,执行Task的Executor挂掉了)。
&& reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
assert (null != failureReason)
numFailures(index) += 1
//如果失败次数大于最大失败次数,则将Task丢弃。
if (numFailures(index) >= maxTaskFailures) {
logError("Task %d in stage %s failed %d times; aborting job".format(
index, taskSet.id, maxTaskFailures))
abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
.format(index, taskSet.id, maxTaskFailures, failureReason), failureException)
return
}
}
//判断TaskSet中Task是否已全部执行完成,是则说明该TaskSet已执行完成,相应的对该
//TaskSetManager的调度结束,从调度池中删除该TaskSetManager
maybeFinishTaskSet()
}
给高层发送消息,调用高层容错机制。
//DAGScheduler#taskEnd
def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics): Unit = {
eventProcessLoop.post(
CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
}
与执行成功的Task一样,向高层调度器DAGScheduler发送的是由CompletionEvent封装的消息。而DAGScheduler会接收到这个消息,对其进行容错处理。
//DAGScheduler#doOnReceive
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
这里是否有似曾相识的感觉,其实步骤6中也有这一过程,再DAGScheduler#handleTaskCompletion中,会根据不同的event#reason,也就是出错信息,进行处理。主要处理的有重复提交Resubmitted和远程读取失败FetchFailed,而其他出错情况则大都采用鸵鸟政策,什么也不做。这边是高层DAGScheduler的容错处理。
& 通过对Driver端执行的过程的观察,我们可以看出底层调度器和高层调度器是紧密合作的,很多时候,在接收到Worker端的StateUpdate信息后,先由TaskSchedulerImpl进行处理,然后同时底层调度器,将这些信息报告给高层调度器,就通信过程来看,真正与Worker联系的是底层调度器,这是在Task层次上的;而底层调度其会将这些信息进行加工,向高层调度器报告,这是联系的内容大都是TaskSetManager,所以这是就是在TaskSetManager层次上进行处理的。所以我们可以看到底层和高层进行处理时,所处的层次是不一样的,这也就是为什么会划分两个调度器的原因了。
& 对于容错,底层调度器和高层调度器也是合作进行的,所以Task在出错时,会进行两个层次上的容错处理,这就大大提交了容错的效率和可靠性。
说明:
本文是由DT大数据梦工厂的IFM课程第37课为基础上,加入一些参考资料所做的笔记
原文:http://blog.csdn.net/sinat_25306771/article/details/51451908