taskscheduler是什么(taskscheduler服务)

 分类:IT知识时间:2022-07-01 07:30:16点击:

TaskScheduler的核心任务时提交的TaskSet到集群运算并汇报结果

  • 为TaskSet创建和维护一个TaskSetManager,并跟踪任务的本定性以及错误信息

  • 遇到Straggle任务时,会放到其他节点进行重试。

  • 向DAGScheduler汇报执行情况,包括Shuffle输出丢失的时候报告fetch failed错误等信息。

TaskScheduler源码,位于
org.apache.spark.scheduler.TaskScheduler

SOLIDWORKS Task Scheduler工具介绍

 /**
 * Low-level task scheduler interface, currently implemented exclusively by
 * [[org.apache.spark.scheduler.TaskSchedulerImpl]].
 * This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks
 * for a single SparkContext. These schedulers get sets of tasks submitted to them from the
 * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
 * them, retrying if there are failures, and mitigating stragglers. They return events to the
 * DAGScheduler.
 */
 /**
 * TaskScheduler任务调度接口。从每一个Stage中的DAGScheduler中获取TaskSet,运行他们,尝试是否有故障。
 * DAGScheduler是高层调度,计算每个Job的Stage的DAG,
 * 然后提交Stage,用TaskSets的形式启动底层TaskScheduler调度在集群中运行。
 */
 private[spark] trait TaskScheduler {
 
 private val appId = "spark-application-" + System.currentTimeMillis
 
 def rootPool: Pool
 
 def schedulingMode: SchedulingMode
 
 def start(): Unit
 
 // Invoked after system has successfully initialized (typically in spark context).
 // Yarn uses this to bootstrap allocation of resources based on preferred locations,
 // wait for slave registrations, etc.
 def postStartHook() { }
 
 // Disconnect from the cluster.
 def stop(): Unit
 
 // Submit a sequence of tasks to run.
 // 提交要运行的任务序列
 def submitTasks(taskSet: TaskSet): Unit
 
 // Cancel a stage.
 def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
 
 /**
 * Kills a task attempt.
 *
 * @return Whether the task was successfully killed.
 */
 def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean
 
 // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
 def setDAGScheduler(dagScheduler: DAGScheduler): Unit
 
 // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
 def defaultParallelism(): Int
 
 /**
 * Update metrics for in-progress tasks and let the master know that the BlockManager is still
 * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
 * indicating that the block manager should re-register.
 */
 def executorHeartbeatReceived(
 execId: String,
 accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
 blockManagerId: BlockManagerId): Boolean
 
 /**
 * Get an application ID associated with the job.
 *
 * @return An application ID
 */
 def applicationId(): String = appId
 
 /**
 * Process a lost executor
 */
 def executorLost(executorId: String, reason: ExecutorLossReason): Unit
 
 /**
 * Get an application's attempt ID associated with the job.
 *
 * @return An application's Attempt ID
 */
 def applicationAttemptId(): Option[String]
 
 }

TaskScheduler原理剖析

DAGScheduler将划分的一系列Stage,按照Stage的先后顺序依次提交给底层的TaskScheduler取执行。在SparkContext实例化的时候,TaskScheduler以及SchedulerBackend就已经在SparkContext的createTaskScheduler中创建实例对象了。

虽然Spark支持多种部署模式,但是底层调度器TaskScheduler接口的实现类都是TaskSchedulerImpl。对于SchedulerBackend的实现,只专注与Standalone部署模式下的具体实现
StandaloneSchedulerBackend来做分析。

TaskSchedulerImpl在createTaskScheduler方法实例化后,就立即调用自己的initialize方法把
StandaloneSchedulerBackend的实例对象传进去,赋值给TaskSchedulerImpl的backend。在initialize方法中,根据调度模式的配置创建实现了SchedulableBuilder接口的相应的实例对象,并且创建的对象会立即调用buildPools创建调度池存放和管理TaskSetManager的实例对象。实现SchedulableBuilder接口的具体类都是SchedulableBuilder的内部类。

  • FIFOSchedulableBuilder:调度模式是SchedulingMode.FIFO,使用先进先出策略调度。先进先出(FIFO)为默认模式。在给模式下只有一个TaskSetManager池。

  • FairSchedulableBuilder:调度模式是SchedulingMode.FAIR,使用公平策略调度。

在createTaskScheduler方法返回后,TaskSchedulerImpl通过DAGScheduler的实例化过程设置DAGScheduler的实例对象。然后调用自己的start方法。在TaskSchedulerImpl调用start方法的时候,会调用
StandaloneSchedulerBackend的start方法,在StandaloneSchedulerBackend的start方法中,会最终注册应用程序AppClient。TaskSchedulerImpl的start方法中还会根据配置判断是否周期性地检查任务的推测执行。

TaskSchedulerImpl启动后,就可以接受DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行京一步处理。TaskSchedulerImpl在submitTasks中初始化一个TaskSetManager,对其生命周期进行管理,当TaskSchedulerImpl得到Worker节点上的Executor计算资源的时候,会通TaskSetManager发送具体的Task到Executor上执行计算。

如果Task执行过程中有错误导致失败,会调用TaskSetManager来处理Task失败的情况,进而通知DAGScheduler结束当前Task。TaskSetManager会将失败的Task再次添加到待执行Task队列中。Spark Task允许执行失败的次数默认是4次,在TaskSchedulerImpl初始化的时候,通过spark.task.maxFailures设置该值。

如果Task执行完毕,执行的结果会反馈给TaskSetManager,由TaskSetManager通知DAGScheduler,DAGScheduler根据是否还存在待执行的Stage,继续迭代提交相应的TaskSet给TaskScheduler取执行,或者输出Job的结果。


除注明外的文章,均为来源:老汤博客,转载请保留本文地址!
原文地址: