首页 > 其他 > 详细

Spark内核源码解析六:worker原理解析和源码解析

时间:2020-05-11 16:01:35      阅读:46      评论:0      收藏:0      [点我收藏+]

技术分享图片

1、worker里面先找到launchDriver

  case LaunchDriver(driverId, driverDesc) => {
      logInfo(s"Asked to launch driver $driverId")
      // 创建DriverRunner线程,包括在driver失败时自动重启driver
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        akkaUrl)
      drivers(driverId) = driver
      driver.start()

      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
    }

DriverRunner里面的start方法

 def start() = {
    new Thread("DriverRunner for " + driverId) {
      override def run() {
        try {
          // 创建工作目录
          val driverDir = createWorkingDirectory()
          // 下载用户的jar包,下载用户jar包到工作目录,然后返回在worker中的路径
          val localJarFilename = downloadUserJar(driverDir)

          def substituteVariables(argument: String): String = argument match {
            case "{{WORKER_URL}}" => workerUrl
            case "{{USER_JAR}}" => localJarFilename
            case other => other
          }

          // TODO: If we add ability to submit multiple jars they should also be added here
          // 构建processBuilder,传入Driver启动命令和需要的cpu和内存信息
          val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
            sparkHome.getAbsolutePath, substituteVariables)
          launchDriver(builder, driverDir, driverDesc.supervise)
        }
        catch {
          case e: Exception => finalException = Some(e)
        }

        val state =
          if (killed) {
            DriverState.KILLED
          } else if (finalException.isDefined) {
            DriverState.ERROR
          } else {
            finalExitCode match {
              case Some(0) => DriverState.FINISHED
              case _ => DriverState.FAILED
            }
          }

        finalState = Some(state)

        // driver启动后向worker发送driver启动的消息
        worker ! DriverStateChanged(driverId, state, finalException)
      }
    }.start()
  }

worker接收到driver启动消息后会将消息发送给master

 case DriverStateChanged(driverId, state, exception) => {
      state match {
        case DriverState.ERROR =>
          logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
        case DriverState.FAILED =>
          logWarning(s"Driver $driverId exited with failure")
        case DriverState.FINISHED =>
          logInfo(s"Driver $driverId exited successfully")
        case DriverState.KILLED =>
          logInfo(s"Driver $driverId was killed by user")
        case _ =>
          logDebug(s"Driver $driverId changed state to $state")
      }
      master ! DriverStateChanged(driverId, state, exception)
      val driver = drivers.remove(driverId).get
      finishedDrivers(driverId) = driver
      memoryUsed -= driver.driverDesc.mem
      coresUsed -= driver.driverDesc.cores

 

Spark内核源码解析六:worker原理解析和源码解析

原文:https://www.cnblogs.com/xiaofeiyang/p/12869689.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!