继上篇《Spark源码分析之Job的调度模型与运行反馈》之后,我们继续来看第二阶段--Stage划分。
Stage划分的大体流程如下图所示:

前面提到,对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理。那么我们先来看下代码:
- private[scheduler] def handleJobSubmitted(jobId: Int,
- finalRDD: RDD[_],
- func: (TaskContext, Iterator[_]) => _,
- partitions: Array[Int],
- callSite: CallSite,
- listener: JobListener,
- properties: Properties) {
- var finalStage: ResultStage = null
-
-
- try {
-
-
-
- finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
- } catch {
- case e: Exception =>
- logWarning("Creating new stage failed due to exception - job: " + jobId, e)
- listener.jobFailed(e)
- return
- }
-
-
- val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
-
-
-
- clearCacheLocs()
-
-
- logInfo("Got job %s (%s) with %d output partitions".format(
- job.jobId, callSite.shortForm, partitions.length))
- logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
- logInfo("Parents of final stage: " + finalStage.parents)
- logInfo("Missing parents: " + getMissingParentStages(finalStage))
-
- val jobSubmissionTime = clock.getTimeMillis()
-
-
- jobIdToActiveJob(jobId) = job
-
-
- activeJobs += job
-
- finalStage.setActiveJob(job)
-
-
-
-
-
- val stageIds = jobIdToStageIds(jobId).toArray
-
- val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
-
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
-
-
- submitStage(finalStage)
-
-
- submitWaitingStages()
- }
这个handleJobSubmitted()方法一共做了这么几件事:
第一,调用newResultStage()方法,生成Stage,包括最后一个Stage:ResultStage和前面的Parent Stage:ShuffleMapStage;
第二,创建一个ActiveJob对象job;
第三,清除RDD分区位置缓存;
第四,调用logInfo()方法记录日志信息;
第五,维护各种数据对应关系涉及到的数据结构:
(1)将jobId-->ActiveJob的对应关系添加到HashMap类型的数据结构jobIdToActiveJob中去;
(2)将ActiveJob添加到HashSet类型的数据结构activeJobs中去;
第六,提交Stage;
下面,除了提交Stage留在第三阶段外,我们挨个分析第二阶段的每一步。
首先是调用newResultStage()方法,生成Stage,包括最后一个Stage:ResultStage和前面的Parent Stage:ShuffleMapStage。代码如下:
- private def newResultStage(
- rdd: RDD[_],
- func: (TaskContext, Iterator[_]) => _,
- partitions: Array[Int],
- jobId: Int,
- callSite: CallSite): ResultStage = {
-
-
- val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
-
-
-
-
- val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
-
-
- stageIdToStage(id) = stage
-
-
- updateJobIdStageIdMaps(jobId, stage)
-
-
- stage
- }
首先,根据fianl RDD获取parent stages及id,这个id为ResultStage的stageId;
其次,创建一个ResultStage,即为整个Job的finalStage;
然后,将stage加入到数据结构stageIdToStage中;
接着,更新数据结构jobIdToStageIds;
最后,返回这个ResultStage。
我们一步步来看。首先调用getParentStagesAndId()方法,根据fianl RDD获取parent stages及id,这个id为ResultStage的stageId。代码如下:
- private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
-
- val parentStages = getParentStages(rdd, firstJobId)
-
-
- val id = nextStageId.getAndIncrement()
-
-
- (parentStages, id)
- }
这个id即为下一个stageId,通过AtomicInteger类型的getAndIncrement()获得,能够保证原子性。继续分析getParentStages()方法,通过它来获取final RDD的parent stage。代码如下:
- private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
-
- val parents = new HashSet[Stage]
-
-
- val visited = new HashSet[RDD[_]]
-
-
-
-
- val waitingForVisit = new Stack[RDD[_]]
-
-
-
- def visit(r: RDD[_]) {
- if (!visited(r)) {
-
- visited += r
-
-
-
-
- for (dep <- r.dependencies) {
- dep match {
- case shufDep: ShuffleDependency[_, _, _] =>
-
- parents += getShuffleMapStage(shufDep, firstJobId)
- case _ =>
-
- waitingForVisit.push(dep.rdd)
- }
- }
- }
- }
-
-
- waitingForVisit.push(rdd)
-
-
- while (waitingForVisit.nonEmpty) {
-
- visit(waitingForVisit.pop())
- }
-
-
- parents.toList
- }
getParentStages()方法在其内部定义了如下数据结构:
parents:用HashSet存储parents stages,即finalRDD的所有parent stages,也就是ShuffleMapStage;
visited:用HashSet存储已经被访问过的RDD,在RDD被处理前先存入该HashSet,保证存储在里面的RDD将不会被重复处理;
waitingForVisit:存储需要被处理的RDD。Stack中得RDD都需要被处理。
getParentStages()方法在其内部还定义了一个visit()方法,传入一个RDD,如果之前没有处理过,标记为已处理,并循环此RDD的依赖关系dependencies,如果是ShuffleDependency,调用getShuffleMapStage()方法获取其parent stage;如果不是,则说明为同一stage,并压入Stack:waitingForVisit顶部,等待后续通过visit()方法处理。所以,getParentStages()方法从finalRDD开始,逐渐往上查找,如果是窄依赖,证明在同一个Stage中,继续往上查找,如果是宽依赖,通过getShuffleMapStage()方法获取其parent stage,就能得到整个Job中所有的parent stages,也就是ShuffleMapStage。
接下来,我们看下getShuffleMapStage()方法的实现。代码如下:
- private def getShuffleMapStage(
- shuffleDep: ShuffleDependency[_, _, _],
- firstJobId: Int): ShuffleMapStage = {
-
-
-
-
-
-
-
- shuffleToMapStage.get(shuffleDep.shuffleId) match {
- case Some(stage) => stage
- case None =>
-
-
-
-
- getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
-
-
- shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
- }
-
-
-
- val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
- shuffleToMapStage(shuffleDep.shuffleId) = stage
- stage
- }
- }
从getShuffleMapStage()方法的注释就能看出,这个方法的主要作用就是针对给定的shuffle dependency的map端,获取或者创建一个ShuffleMapStage。为何是Get or create呢?通过源码得知,getShuffleMapStage()方法首先会根据shuffleDep.shuffleId从数据结构shuffleToMapStage中查找哦是否存在对应的stage,如果存在则直接返回,如果不存在,则调用newOrUsedShuffleStage()方法创建一个Stage并添加到数据结构shuffleToMapStage中,方便后续需要使用此Stage者直接使用。在此之前,会根据入参ShuffleDependency的rdd发现还没有在shuffleToMapStage中注册的祖先shuffle dependencies,然后遍历每个ShuffleDependency,调用newOrUsedShuffleStage()方法为每个ShuffleDependency产生Stage并添加到数据结构shuffleToMapStage中。
下面,我们看下这个getAncestorShuffleDependencies()方法的实现,代码如下:
-
- private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
-
-
- val parents = new Stack[ShuffleDependency[_, _, _]]
-
-
- val visited = new HashSet[RDD[_]]
-
-
-
-
- val waitingForVisit = new Stack[RDD[_]]
-
-
- def visit(r: RDD[_]) {
- if (!visited(r)) {
- visited += r
-
-
- for (dep <- r.dependencies) {
- dep match {
- case shufDep: ShuffleDependency[_, _, _] =>
-
- if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
- parents.push(shufDep)
- }
- case _ =>
- }
-
-
- waitingForVisit.push(dep.rdd)
- }
- }
- }
-
-
- waitingForVisit.push(rdd)
-
- while (waitingForVisit.nonEmpty) {
- visit(waitingForVisit.pop())
- }
-
-
- parents
- }
通过代码我们可以发现,它和getParentStages()方法的代码风格非常相似。在其内部也定义了三个数据结构:
parents:存放parents的栈,即Stack,用于存放入参RDD的在shuffleToMapStage中未注册过的祖先shuffle dependencies;
visited:存放已经处理过的RDD的哈希表,即HashSet;
waitingForVisit:存放等待被处理的RDD的栈,即Stack;
定义了一个visit()方法,入参为RDD,针对传入的RDD,如果之前没有处理过则标记为已处理,并循环RDD的所有依赖,如果是如果是ShuffleDependency,并且其依赖的shuffleId在shuffleToMapStage中没有,添加到parents中,否则直接跳过,最后无论为何种Dependency,都将该dependence的rdd压入waitingForVisit栈顶部,等待后续处理。
接下来,我们再看下newOrUsedShuffleStage()方法,其代码如下:
- private def newOrUsedShuffleStage(
- shuffleDep: ShuffleDependency[_, _, _],
- firstJobId: Int): ShuffleMapStage = {
-
-
- val rdd = shuffleDep.rdd
-
-
- val numTasks = rdd.partitions.length
-
-
- val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
-
-
- if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
-
-
-
- val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
-
-
- val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
-
-
- (0 until locs.length).foreach { i =>
- if (locs(i) ne null) {
-
-
- stage.addOutputLoc(i, locs(i))
- }
- }
- } else {
-
-
-
-
- logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
-
-
-
- mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
- }
- stage
- }
这个方法的主要完成了以下两件事:
1、构造一个ShuffleMapStage实例stage;
2、判断是否在mapOutputTracker中存在:
(1)如果不存在,调用mapOutputTracker的registerShuffle()方法注册一个,注册的内容为根据shuffleDep获取的shuffleId和rdd中分区的个数;
(2)如果存在,根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象,反序列化后循环,逐个添加到stage中。
紧接着,看下newShuffleMapStage()方法,其代码如下:
- private def newShuffleMapStage(
- rdd: RDD[_],
- numTasks: Int,
- shuffleDep: ShuffleDependency[_, _, _],
- firstJobId: Int,
- callSite: CallSite): ShuffleMapStage = {
-
-
- val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
-
-
- val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
- firstJobId, callSite, shuffleDep)
-
-
- stageIdToStage(id) = stage
- updateJobIdStageIdMaps(firstJobId, stage)
- stage
- }
可以发现,这个方法也调用了getParentStagesAndId()方法,这样,就形成了一个递归,按照RDD的依赖关系,由后往前,逐渐生成Stage。代码剩余的部分就是创建一个ShuffleMapStage,并将stage加入到数据结构stageIdToStage,以及调用updateJobIdStageIdMaps()方法更新相关数据结构。这个updateJobIdStageIdMaps()方法留待下面分析。
下面,简单看下mapOutputTracker注册的代码。
- def registerShuffle(shuffleId: Int, numMaps: Int) {
-
-
- if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
- throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
- }
- }
很简单,将shuffleId、numMaps大小和MapStatus类型的Array数组的映射关系,放入mapStatuses中,mapStatuses为TimeStampedHashMap[Int, Array[MapStatus]]类型的数据结构。
经历了这多又长又大篇幅的叙述,现在返回newResultStage()方法,在通过getParentStagesAndId()方法获取parent stages及其result stage的id后,紧接着创建一个ResultStage,并将stage加入到stageIdToStage中,最后在调用updateJobIdStageIdMaps()更新数据结构jobIdToStageIds后,返回stage。
下面,简单看下updateJobIdStageIdMaps()方法。代码如下:
- private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
-
- def updateJobIdStageIdMapsList(stages: List[Stage]) {
-
- if (stages.nonEmpty) {
-
-
- val s = stages.head
-
-
- s.jobIds += jobId
-
-
- jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
-
- val parents: List[Stage] = getParentStages(s.rdd, jobId)
-
- val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) }
- updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
- }
- }
-
- updateJobIdStageIdMapsList(List(stage))
- }
这个方法的实现比较简单,在其内部定义了一个函数updateJobIdStageIdMapsList(),首选传入result stage,将jobId添加到stage的jobIds中,更新jobIdToStageIds,将jobId与stageIds的对应关系添加进去,然后根据给定stage的RDD获取其parent stages,过滤出不包含此JobId的parents stages,再递归调用updateJobIdStageIdMapsList()方法,直到全部stage都处理完。
至此,第二阶段Stage划分大体流程已分析完毕,有遗漏或不清楚的地方,以后再查缺补漏以及细化及更正错误。
博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50674189
Spark源码分析之三:Stage划分
原文:http://www.cnblogs.com/jirimutu01/p/5274456.html