博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark源码分析 -- SchedulableBuilder
阅读量:6352 次
发布时间:2019-06-22

本文共 17723 字,大约阅读时间需要 59 分钟。

SchedulableBuilder就是对Scheduleable tree的封装,

在Pool层面(中间节点), 完成对TaskSet的调度(FIFO, FAIR)
在TaskSetManager 层面(叶子节点), 完成对TaskSet中task的调度(locality)以及track(retry)

TaskSetManager

用于封装TaskSet, 主要提供对单个TaskSet内部的tasks的track和schedule

所以主要的接口,
resourceOffer, 对于一个resource offer, 如何schedule一个task来执行
statusUpdate, 对于task状态的track

/** * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of * each task and is responsible for retries on failure and locality. The main interfaces to it * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and * statusUpdate, which tells it that one of its tasks changed state (e.g. finished). * * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler * (e.g. its event handlers). It should not be called from other threads. */private[spark] trait TaskSetManager extends Schedulable {  def schedulableQueue = null    def schedulingMode = SchedulingMode.NONE  def taskSet: TaskSet  def resourceOffer(      execId: String,      host: String,      availableCpus: Int,      maxLocality: TaskLocality.TaskLocality)    : Option[TaskDescription]  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)  def error(message: String)}

 

ClusterTaskSetManager

ClusterScheduler上对于TaskSetManager的实现

1 addPendingTask

locality, 在schedule时候需要考虑, 应该优先执行尽可能近的task
所有未被执行的tasks, 都是pending task, 并且是安装不同locality粒度存储在hashmap中的
pendingTasksForExecutor, hashmap, 每个executor被指定的task
pendingTasksForHost,  hashmap, 每个instance被指定的task
pendingTasksForRack, hashmap, 每个机架被指定的task
pendingTasksWithNoPrefs, ArrayBuffer, 没有locality preferences的tasks, 随便在那边执行
allPendingTasks, ArrayBuffer, 所有的pending task
speculatableTasks, 重复的task, 熟悉hadoop的应该容易理解
可以继续看下addPendingTask, 如何把task加到各个list上去

addPendingTask(index: Int, readding: Boolean = false)

两个参数,
index, task的index, 用于从taskset中取得task
readding, 表示是否新的task, 因为当executor失败的时候, 也需要把task重新再加到各个list中, list中有重复的task是没有关系的, 因为选取task的时候会自动忽略已经run的task

 

2 resourceOffer

解决如何在taskset内部schedule一个task, 主要需要考虑的是locality, 直接看注释
其中比较意思的是, 对currentLocalityIndex的维护
初始时为0, PROCESS_LOCAL, 只能选择PendingTasksForExecutor
每次调用resourceOffer, 都会计算和前一次task launch之间的时间间隔, 如果超时(各个locality的超时时间不同), currentLocalityIndex会加1, 即不断的放宽
而代表前一次的lastLaunchTime, 只有在resourceOffer中成功的findTask时会被更新, 所以逻辑就是优先选择更local的task, 但当findTask总失败时, 说明需要放宽
但是放宽后, 当有比较local的task被选中时, 这个currentLocalityIndex还会缩小, 因为每次都会把tasklocality赋值给currentLocality

 

3 statusUpdate

应对statusUpdate, 主要是通过在clusterScheduler中注册的listener通知DAGScheduler
当然对于失败的task, 还要再加到pending list里面去

/** * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of * the status of each task, retries tasks if they fail (up to a limited number of times), and * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished). * * THREADING: This class is designed to only be called from code with a lock on the * ClusterScheduler (e.g. its event handlers). It should not be called from other threads. */private[spark] class ClusterTaskSetManager(    sched: ClusterScheduler,    val taskSet: TaskSet,    clock: Clock = SystemClock)  extends TaskSetManager  with Logging{
val tasks = taskSet.tasks  val numTasks = tasks.length
// Set of pending tasks for each executor. These collections are actually  // treated as stacks, in which new tasks are added to the end of the  // ArrayBuffer and removed from the end. This makes it faster to detect  // tasks that repeatedly fail because whenever a task failed, it is put  // back at the head of the stack. They are also only cleaned up lazily;  // when a task is launched, it remains in all the pending lists except  // the one that it was launched from, but gets removed from them later.  private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]  // Set of pending tasks for each host. Similar to pendingTasksForExecutor,  // but at host level.  private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]  // Set of pending tasks for each rack -- similar to the above.  private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]  // Set containing pending tasks with no locality preferences.  val pendingTasksWithNoPrefs = new ArrayBuffer[Int]  // Set containing all pending tasks (also used as a stack, as above).  val allPendingTasks = new ArrayBuffer[Int]  // Tasks that can be speculated. Since these will be a small fraction of total  // tasks, we'll just hold them in a HashSet.  val speculatableTasks = new HashSet[Int]
 
// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling  val myLocalityLevels = computeValidLocalityLevels() // 当前TaskSet里面的task locality有哪些  val localityWaits = myLocalityLevels.map(getLocalityWait) // 每个locality level默认的等待时间(从配置读)  // Delay scheduling variables: we keep track of our current locality level and the time we  // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.  // We then move down if we manage to launch a "more local" task.  var currentLocalityIndex = 0    // 当前myLocalityLevels中的index, 从0开始, 从最小的开始schedule  var lastLaunchTime = clock.getTime()  // 记录最后launch task的时间, 用于后面会算超时, 如果发生超时, currentLocalityIndex+1
 
/**   * Add a task to all the pending-task lists that it should be on. If readding is set, we are   * re-adding the task so only include it in each list if it's not already there.   */  private def addPendingTask(index: Int, readding: Boolean = false) {    // Utility method that adds `index` to a list only if readding=false or it's not already there    def addTo(list: ArrayBuffer[Int]) {      if (!readding || !list.contains(index)) { // 新的的task或在该list里面没有        list += index      }    }    var hadAliveLocations = false    for (loc <- tasks(index).preferredLocations) {      for (execId <- loc.executorId) {        if (sched.isExecutorAlive(execId)) {          addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) // 首先加到相应的executor列表中          hadAliveLocations = true        }      }      if (sched.hasExecutorsAliveOnHost(loc.host)) {        addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) // 加到host的列表中         for (rack <- sched.getRackForHost(loc.host)) {          addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) // 加到Rack的列表中        }        hadAliveLocations = true      }    }    if (!hadAliveLocations) { // 如果上面的选择都失败了, 或本来就没有preferred locations, 那就加到pendingTasksWithNoPrefs中      // Even though the task might've had preferred locations, all of those hosts or executors      // are dead; put it in the no-prefs list so we can schedule it elsewhere right away.      addTo(pendingTasksWithNoPrefs)    }    if (!readding) { // 对于新的task, 需要加到allPendingTasks中      allPendingTasks += index  // No point scanning this whole list to find the old task there    }  }
 
/**   * Dequeue a pending task for a given node and return its index and locality level.   * Only search for tasks matching the given locality constraint.   */  private def findTask(execId: String, host: String, locality: TaskLocality.Value)    : Option[(Int, TaskLocality.Value)] =  {    // 先从Executor
for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) { // findTaskFromList, Dequeue a pending task from the given list and return its index.      return Some((index, TaskLocality.PROCESS_LOCAL))    }    // Node, 需要先check locality    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { // locality >= TaskLocality.NODE_LOCAL       for (index <- findTaskFromList(getPendingTasksForHost(host))) {        return Some((index, TaskLocality.NODE_LOCAL))      }    }    // Rack, 需要先check locality     if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {      for {        rack <- sched.getRackForHost(host)        index <- findTaskFromList(getPendingTasksForRack(rack))      } {        return Some((index, TaskLocality.RACK_LOCAL))      }    }    // Look for no-pref tasks after rack-local tasks since they can run anywhere.    for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {      return Some((index, TaskLocality.PROCESS_LOCAL))    }    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {      for (index <- findTaskFromList(allPendingTasks)) {        return Some((index, TaskLocality.ANY))      }    }    // Finally, if all else has failed, find a speculative task    return findSpeculativeTask(execId, host, locality)  }

 

/**   * Respond to an offer of a single executor from the scheduler by finding a task   */  override def resourceOffer(      execId: String,      host: String,      availableCpus: Int,      maxLocality: TaskLocality.TaskLocality)    : Option[TaskDescription] =  {    if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { // 前提是task没有执行完和有足够的available cores(>1)      val curTime = clock.getTime()      var allowedLocality = getAllowedLocalityLevel(curTime) // 取到当前allowed LocalityLevel      if (allowedLocality > maxLocality) {  //  不能超出作为参数传入的maxLocality, 调用者限定        allowedLocality = maxLocality   // We're not allowed to search for farther-away tasks      }      findTask(execId, host, allowedLocality) match { // 调用findTask, 并对返回值进行case, findTask逻辑很简单就是依次从不同的locality中取task        case Some((index, taskLocality)) => {          // Found a task; do some bookkeeping and return a task description          val task = tasks(index)          val taskId = sched.newTaskId()          // Figure out whether this should count as a preferred launch          logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(            taskSet.id, index, taskId, execId, host, taskLocality))          // Do various bookkeeping          copiesRunning(index) += 1          val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)          taskInfos(taskId) = info          taskAttempts(index) = info :: taskAttempts(index)          // Update our locality level for delay scheduling          currentLocalityIndex = getLocalityIndex(taskLocality) // 用当前Task的locality来更新currentLocalityIndex, 这里index有可能会减少, 因为taskLocality <= currentLocality           lastLaunchTime = curTime      // 更新lastLaunchTime           // Serialize and return the task          val startTime = clock.getTime()          // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here          // we assume the task can be serialized without exceptions.          val serializedTask = Task.serializeWithDependencies(            task, sched.sc.addedFiles, sched.sc.addedJars, ser)          val timeTaken = clock.getTime() - startTime          increaseRunningTasks(1)          logInfo("Serialized task %s:%d as %d bytes in %d ms".format(            taskSet.id, index, serializedTask.limit, timeTaken))          val taskName = "task %s:%d".format(taskSet.id, index)          if (taskAttempts(index).size == 1)            taskStarted(task,info)          return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) // 最终返回schedule得到的那个task        }        case _ =>      }    }    return None  }
 
/**   * Get the level we can launch tasks according to delay scheduling, based on current wait time.   */  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {    while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&  // 发生超时        currentLocalityIndex < myLocalityLevels.length - 1)    {      // Jump to the next locality level, and remove our waiting time for the current one since      // we don't want to count it again on the next one      lastLaunchTime += localityWaits(currentLocalityIndex)      currentLocalityIndex += 1   // currentLocalityIndex 加 1    }    myLocalityLevels(currentLocalityIndex)  }
 
/**   * Find the index in myLocalityLevels for a given locality. This is also designed to work with   * localities that are not in myLocalityLevels (in case we somehow get those) by returning the   * next-biggest level we have. Uses the fact that the last value in myLocalityLevels is ANY.   */  def getLocalityIndex(locality: TaskLocality.TaskLocality): Int = { // 查询locality在myLocalityLevels中的index    var index = 0    while (locality > myLocalityLevels(index)) {      index += 1    }    index  }
 
/**   * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been   * added to queues using addPendingTask.   */
// 仅仅从各个pending list中看看当前的taskset中的task有哪些preference locality, 从小到大   private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}    val levels = new ArrayBuffer[TaskLocality.TaskLocality]    if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {      levels += PROCESS_LOCAL    }    if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {      levels += NODE_LOCAL    }    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {      levels += RACK_LOCAL    }    levels += ANY    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))    levels.toArray  }
 
/** Called by cluster scheduler when one of our tasks changes state */  override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {    SparkEnv.set(env)    state match {      case TaskState.FINISHED =>        taskFinished(tid, state, serializedData)      case TaskState.LOST =>        taskLost(tid, state, serializedData)      case TaskState.FAILED =>        taskLost(tid, state, serializedData)      case TaskState.KILLED =>        taskLost(tid, state, serializedData)      case _ =>    }  }
def taskStarted(task: Task[_], info: TaskInfo) {    sched.listener.taskStarted(task, info)   }
}

 

Pool

一种对schedulableQueue的抽象, 什么是schedulable?

注释说的, 包含Pools and TaskSetManagers, 这里设计有问题, 你会发现Pools和TaskSetManagers的核心接口完全不同, 虽然TaskSetManagers里面也实现了这些接口, 但都是meanless的
简单理解成, 作者想要统一对待, 泛化Pools和TaskSetManagers, 所以这样做了

所以对于Pool, 可以理解为TaskSetManagers的容器, 当然由于Pool本身也是Schedulable, 所以容器里面也可以放Pool

核心接口getSortedTaskSetQueue, 通过配置不同的SchedulingAlgorithm来调度TaskSetManagers(或pool)

所以注意那些FIFO或FAIR都是用来调度TaskSet的, 所以Spark调度的基础是stage

/** * An interface for schedulable entities. * there are two type of Schedulable entities(Pools and TaskSetManagers) */private[spark] trait Schedulable {  var parent: Schedulable  // child queues  def schedulableQueue: ArrayBuffer[Schedulable]  def schedulingMode: SchedulingMode  def weight: Int  def minShare: Int  def runningTasks: Int  def priority: Int  def stageId: Int  def name: String  def increaseRunningTasks(taskNum: Int): Unit  def decreaseRunningTasks(taskNum: Int): Unit  def addSchedulable(schedulable: Schedulable): Unit  def removeSchedulable(schedulable: Schedulable): Unit  def getSchedulableByName(name: String): Schedulable  def executorLost(executorId: String, host: String): Unit  def checkSpeculatableTasks(): Boolean  def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]  def hasPendingTasks(): Boolean}

 

package org.apache.spark.scheduler.cluster//An Schedulable entity that represent collection of Pools or TaskSetManagersprivate[spark] class Pool(    val poolName: String,    val schedulingMode: SchedulingMode,    initMinShare: Int,    initWeight: Int)  extends Schedulable  with Logging {  var schedulableQueue = new ArrayBuffer[Schedulable] // 用于buffer Schedulable, TaskSetManager  var schedulableNameToSchedulable = new HashMap[String, Schedulable]  var priority = 0  var stageId = 0  var name = poolName  var parent:Schedulable = null  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { // SchedulingAlgorithm其实就是定义comparator,后面好将TaskSet排序    schedulingMode match {      case SchedulingMode.FAIR =>         new FairSchedulingAlgorithm() // Fair      case SchedulingMode.FIFO =>        new FIFOSchedulingAlgorithm() // FIFO    }  }  override def addSchedulable(schedulable: Schedulable) { // 增加一个TaskSetManager    schedulableQueue += schedulable    schedulableNameToSchedulable(schedulable.name) = schedulable    schedulable.parent= this  }  override def removeSchedulable(schedulable: Schedulable) { // 删除一个TaskSetManager     schedulableQueue -= schedulable    schedulableNameToSchedulable -= schedulable.name  }  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { // 返回排过序的TaskSetManager列表    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]    val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator) // sortWith     for (schedulable <- sortedSchedulableQueue) {      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() // 这里的schedulable有可能也是pool, 所以需要递归调用    }    return sortedTaskSetQueue  }}

 

SchedulableBuilder

上面说了Pool里面可以是TaskSetManagers也可以是pool, 这样是不是可以形成tree

SchedulableBuilder就是对Schedulable Tree的封装, 通过TaskSetManagers(叶节点)和pools(中间节点), 来生成Schedulable Tree
这里只列出最简单的FIFO, 看不出tree的感觉
对于FIFO很简单, 直接使用一个Pool就可以, 把所有的TaskSet使用addSchedulable加进去, 然后排序读出来即可

这里没有列出Fair的实现, 比较复杂, 后面再分析吧

/** * An interface to build Schedulable tree * buildPools: build the tree nodes(pools) * addTaskSetManager: build the leaf nodes(TaskSetManagers) */private[spark] trait SchedulableBuilder {  def buildPools()  def addTaskSetManager(manager: Schedulable, properties: Properties)}private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)  extends SchedulableBuilder with Logging {  override def buildPools() {    // nothing  }  override def addTaskSetManager(manager: Schedulable, properties: Properties) {    rootPool.addSchedulable(manager)  }}

转载地址:http://gxlla.baihongyu.com/

你可能感兴趣的文章
动态从数据库获取数据(Vue.js)【数据可变】
查看>>
操作linux命令
查看>>
使用几种常用排序方法对C#数组进行排序的代码
查看>>
Shell中的&&、 ||、 ()和 {}
查看>>
电子商务网站互联网安全防御攻略
查看>>
Apache三种多路处理模块
查看>>
模拟任务资源管理器的小程序
查看>>
PowerPC——缓存一致性入门介绍
查看>>
跨SDH专线连接的两个局域网文件共享问题
查看>>
Linux下通过vmplayer安装CactiEZ
查看>>
word粘贴visor中的内容转换图片
查看>>
mysql单表最大列数(字段数)
查看>>
System Center 2012 SP1 Data Protection Manager 设置邮件通知
查看>>
SQL2012群集问题
查看>>
wind server2008R2 L2TP***证书连接
查看>>
我的友情链接
查看>>
flex3和flex4之间的区别
查看>>
第十一讲:软考中高项11_法律法规、标准规范、职业道德
查看>>
No enclosing instance of type recursion is accessible.
查看>>
我的友情链接
查看>>