博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark job提交4
阅读量:4215 次
发布时间:2019-05-26

本文共 3894 字,大约阅读时间需要 12 分钟。

taskscheduler的submitTasks是通过TaskSchedulerImpl的submitTasks实现,stage由tasks组成,task被封装成taskset, override def submitTasks(taskSet: TaskSet) {    val tasks = taskSet.tasks    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")    this.synchronized {#创建TaskSetManager,主要用用对Taskset中Task进行管理,包括调度,运行等.      val manager = createTaskSetManager(taskSet, maxTaskFailures)      val stage = taskSet.stageId      val stageTaskSets =        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])      stageTaskSets(taskSet.stageAttemptId) = manager      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>        ts.taskSet != taskSet && !ts.isZombie      }      if (conflictingTaskSet) {        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")      }#将TaskSetManager 添加到schedulableBuilder      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)      if (!isLocal && !hasReceivedTask) {        starvationTimer.scheduleAtFixedRate(new TimerTask() {          override def run() {            if (!hasLaunchedTask) {              logWarning("Initial job has not accepted any resources; " +                "check your cluster UI to ensure that workers are registered " +                "and have sufficient resources")            } else {              this.cancel()            }          }        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)      }      hasReceivedTask = true    }#为Task分配运行资源    backend.reviveOffers()  }这里的backend有很大实现,这里以CoarseGrainedSchedulerBackend  为例  override def reviveOffers() {    driverEndpoint.send(ReviveOffers)  }看看driverEndpoint是如何赋值的其类型  var driverEndpoint: RpcEndpointRef = null赋值为  driverEndpoint = createDriverEndpointRef(properties)  protected def createDriverEndpointRef(      properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))  }  protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {    new DriverEndpoint(rpcEnv, properties)  }所以最终driverEndpoint是DriverEndpoint 这个类,这里使用akka 来发送消息,也就是说driverEndpoint.send,最终会被driverEndpoint.receiver接收class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])    extends ThreadSafeRpcEndpoint with Logging {   override def receive: PartialFunction[Any, Unit] = {      case StatusUpdate(executorId, taskId, state, data) =>        scheduler.statusUpdate(taskId, state, data.value)        if (TaskState.isFinished(state)) {          executorDataMap.get(executorId) match {            case Some(executorInfo) =>              executorInfo.freeCores += scheduler.CPUS_PER_TASK              makeOffers(executorId)            case None =>              // Ignoring the update since we don't know about the executor.              logWarning(s"Ignored task status update ($taskId state $state) " +                s"from unknown executor with ID $executorId")          }        }      case ReviveOffers =>#处理发送过来的ReviveOffers 消息        makeOffers()} private def makeOffers() {      // Make sure no executor is killed while some task is launching on it      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {        // Filter out executors under killing#所有激活的executors        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)#workOffers 表示Executor上的资源        val workOffers = activeExecutors.map {          case (id, executorData) =>            new WorkerOffer(id, executorData.executorHost, executorData.freeCores,              Some(executorData.executorAddress.hostPort))        }.toIndexedSeq        scheduler.resourceOffers(workOffers)      }      if (!taskDescs.isEmpty) {#启动task的运行,这些task会被提交到worker节点上,从这一步开始从driver走到work上        launchTasks(taskDescs)      }    }

 

转载地址:http://ysnmi.baihongyu.com/

你可能感兴趣的文章
阅读源码的三种境界 (转 码农翻身 微信公众号)
查看>>
All Things OpenTSDB
查看>>
表格存储最佳实践:一种用于存储时间序列数据的表结构设计
查看>>
OpenTSDB介绍
查看>>
OpenTSDB原理系列:元数据模型
查看>>
解密OpenTSDB的表存储优化
查看>>
OpeTSDB的Configuration配置
查看>>
FQDN
查看>>
时序数据库
查看>>
jmxtrans+influxdb+granafa监控hbase
查看>>
使用jmxtrans监控Spark JVM信息到grafana显示
查看>>
HBase - ROOT 和 META 表结构 (region定位原理)
查看>>
HBase API 和 基本操作
查看>>
Hbase的存储模型
查看>>
InfluxDB influxdbc.conf配置文件详解
查看>>
通过BulkLoad的方式快速导入海量数据
查看>>
Mysql根据内容查找在哪个表(Go版本)
查看>>
玩转Anaconda
查看>>
kali linux中文版安装
查看>>
安卓逆向之环境搭建
查看>>