
1、worker里面先找到launchDriver
case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") // 创建DriverRunner线程,包括在driver失败时自动重启driver val driver = new DriverRunner( conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, akkaUrl) drivers(driverId) = driver driver.start() coresUsed += driverDesc.cores memoryUsed += driverDesc.mem }
DriverRunner里面的start方法
def start() = { new Thread("DriverRunner for " + driverId) { override def run() { try { // 创建工作目录 val driverDir = createWorkingDirectory() // 下载用户的jar包,下载用户jar包到工作目录,然后返回在worker中的路径 val localJarFilename = downloadUserJar(driverDir) def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename case other => other } // TODO: If we add ability to submit multiple jars they should also be added here // 构建processBuilder,传入Driver启动命令和需要的cpu和内存信息 val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) launchDriver(builder, driverDir, driverDesc.supervise) } catch { case e: Exception => finalException = Some(e) } val state = if (killed) { DriverState.KILLED } else if (finalException.isDefined) { DriverState.ERROR } else { finalExitCode match { case Some(0) => DriverState.FINISHED case _ => DriverState.FAILED } } finalState = Some(state) // driver启动后向worker发送driver启动的消息 worker ! DriverStateChanged(driverId, state, finalException) } }.start() }
worker接收到driver启动消息后会将消息发送给master
case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.ERROR => logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") case DriverState.FAILED => logWarning(s"Driver $driverId exited with failure") case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => logInfo(s"Driver $driverId was killed by user") case _ => logDebug(s"Driver $driverId changed state to $state") } master ! DriverStateChanged(driverId, state, exception) val driver = drivers.remove(driverId).get finishedDrivers(driverId) = driver memoryUsed -= driver.driverDesc.mem coresUsed -= driver.driverDesc.cores
原文:https://www.cnblogs.com/xiaofeiyang/p/12869689.html