本文共 3894 字,大约阅读时间需要 12 分钟。
taskscheduler的submitTasks是通过TaskSchedulerImpl的submitTasks实现,stage由tasks组成,task被封装成taskset, override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized {#创建TaskSetManager,主要用用对Taskset中Task进行管理,包括调度,运行等. val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") }#将TaskSetManager 添加到schedulableBuilder schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true }#为Task分配运行资源 backend.reviveOffers() }这里的backend有很大实现,这里以CoarseGrainedSchedulerBackend 为例 override def reviveOffers() { driverEndpoint.send(ReviveOffers) }看看driverEndpoint是如何赋值的其类型 var driverEndpoint: RpcEndpointRef = null赋值为 driverEndpoint = createDriverEndpointRef(properties) protected def createDriverEndpointRef( properties: ArrayBuffer[(String, String)]): RpcEndpointRef = { rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties)) } protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { new DriverEndpoint(rpcEnv, properties) }所以最终driverEndpoint是DriverEndpoint 这个类,这里使用akka 来发送消息,也就是说driverEndpoint.send,最终会被driverEndpoint.receiver接收class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } } case ReviveOffers =>#处理发送过来的ReviveOffers 消息 makeOffers()} private def makeOffers() { // Make sure no executor is killed while some task is launching on it val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { // Filter out executors under killing#所有激活的executors val activeExecutors = executorDataMap.filterKeys(executorIsAlive)#workOffers 表示Executor上的资源 val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores, Some(executorData.executorAddress.hostPort)) }.toIndexedSeq scheduler.resourceOffers(workOffers) } if (!taskDescs.isEmpty) {#启动task的运行,这些task会被提交到worker节点上,从这一步开始从driver走到work上 launchTasks(taskDescs) } }
转载地址:http://ysnmi.baihongyu.com/