本文记录在Spark Applications UI中Kill job时的流程,其实也就是cancel Job的流程。
界面点击kill链接,会执行/jobs/job/kill?id=${jobId}。首先看看SparkUI中对于这个链接的注册:
从上面我们可以看出,在点击了/jobs/job/kill链接后,会交由 jobsTab.handleKillRequest来进行响应处理。
JobsTab.handleKillRequest方法定义如下:
上面的代码首先验证用户权限,然后获取job的id,最后调用SparkContext的cancelJob方法来取消Job。
在SparkContext中,cancel方法只是简单的调用了DAGScheduler的scancelJob方法。
而DAGScheduler的scancelJob方法,只是将一个JobCancelled对象加入到DAGScheduler的eventProcessLoop(具体实现为DAGSchedulerEventProcessLoop对象)队列中。于是我们转到DAGSchedulerEventProcessLoop的doOnReceive方法中。
|
|
于是,又将事件交给 DAGScheduler的handleJobCancellation方法处理了。handleJobCancellation的定义如下:
方法中,首先会检测jobId是否归属与某个Stage,只有Job归属于某个Stage,才会将job进行处理,调用failJobAndIndependentStages方法。
|
|
这块的逻辑大概如下:首先从配置中获取spark.job.interruptOnCancel参数的值,用于决定在cancel task的时候是否要中断线程。然后根据jobId,获取它所处于的Stage。根据Stage的id,调用taskScheduler取消task,并将Stage标记为完成。
其中取消task的代码为:
继续跳转到TaskScheduler的cancelTasks方法,这个方法是抽象方法,具体的实现是由TaskSchedulerImpl来实现的。那么我们看一下 TashSchedulerImpl的cancelTasks方法:
cancelTasks的实现逻辑是:根据stageId得到运行task set的TaskSetManager,然后根据TaskSetManager得到正在运行的task集合,根据task的id从taskIdToExecutorId映射关系中得到运行task的Executor的id,这样调用SchedulerBackend的killTask方法,将executorId和taskId作为参数,杀掉task。
对于SchedulerBackend的实现,我们在启动executor的时候就知道了,对应的实现为CoarseGrainedSchedulerBackend。
CoarseGrainedSchedulerBackend.killTask
于是,就向driver发送了一个 KillTask对象。driver会通过endpoint向executorId所对应的Executor发送kill task的命令。我们跳到Executor中killTask的实现:
这里就是kill task的具体实现,从代码中我们可以知道,kill task有两种逻辑:
直接调用TaskRunner对象的kill,什么也不操作。
除了调用TaskRunner对象的kill,还会使用Reaper对Task进行dump操作。
如果要是启用Reaper,需要开启spark.task.reaper.enabled参数。
对于Reaper的实现逻辑:调用TaskRunner的kill,然后对Task对应的线程进行dump操作。关于reaper还有好几个参数,用来定义Reaper的逻辑:
直接调用TaskRunner的kill,就是不进行Reaper监控。而且我们也知道了,使用Reaper进行监控,其内部也会调用TaskRunner的kill,所以下面我们进行TaskRunner.kill方法的逻辑分析:
TaskRunner中保存着Task的引用,调用Task的kill方法。
这里有三个逻辑:
设置_reasonIfKilled,如果Task还没有运行,设置了这个值后,task运行时会直接退出。一旦task运行了,设置这个值是不会停止的,因此需要第二个方式。
设置context.markInterrupted(reason),一旦Task运行了,就会在其内部构建一个TaskContext,调用这个方法,进行状态标记。在执行RDD中数据读取(调用InterruptibleIterator的next方法)的时候会调用TaskContext的killTaskIfInterrupted方法进行中断验证,如果已经调用过markInterrupted方法,killTaskIfInterrupted方法就会抛出异常,终止Task的运行。
参数指定要中断线程,直接执行线程中断。
这里在附加一下InterruptibleIterator的实现:
其实就是对dlegate所指定的Iterator进行了一层包装。查找InterruptibleIterator的使用,RDD的getOrCompute方法返回的Iterator就是InterruptibleIterator类型。
我们知道Task读取数据时,就会用到RDD的getOrCompute方法,因此就可以让task中断啦。对于InterruptibleIterator的使用,不只是RDD,还有BlockStoreShuffleReader、NewHadoopRDD等都在使用。