Spark 2.11 Spark Ui中 Kill Job

本文记录在Spark Applications UI中Kill job时的流程,其实也就是cancel Job的流程。

界面点击kill链接,会执行/jobs/job/kill?id=${jobId}。首先看看SparkUI中对于这个链接的注册:

1
2
3
4
attachHandler(createRedirectHandler(
"/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST")))

从上面我们可以看出,在点击了/jobs/job/kill链接后,会交由 jobsTab.handleKillRequest来进行响应处理。
JobsTab.handleKillRequest方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
// stripXSS is called first to remove suspicious characters used in XSS attacks
val jobId = Option(UIUtils.stripXSS(request.getParameter("id"))).map(_.toInt)
jobId.foreach { id =>
if (jobProgresslistener.activeJobs.contains(id)) {
sc.foreach(_.cancelJob(id))
Thread.sleep(100)
}
}
}
}

上面的代码首先验证用户权限,然后获取job的id,最后调用SparkContext的cancelJob方法来取消Job。
在SparkContext中,cancel方法只是简单的调用了DAGScheduler的scancelJob方法。

1
2
3
def cancelJob(jobId: Int): Unit = {
dagScheduler.cancelJob(jobId, None)
}

而DAGScheduler的scancelJob方法,只是将一个JobCancelled对象加入到DAGScheduler的eventProcessLoop(具体实现为DAGSchedulerEventProcessLoop对象)队列中。于是我们转到DAGSchedulerEventProcessLoop的doOnReceive方法中。

1
2
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)

于是,又将事件交给 DAGScheduler的handleJobCancellation方法处理了。handleJobCancellation的定义如下:

1
2
3
4
5
6
7
8
private[scheduler] def handleJobCancellation(jobId: Int, reason: Option[String]) {
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
failJobAndIndependentStages(
jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason.getOrElse("")))
}
}

方法中,首先会检测jobId是否归属与某个Stage,只有Job归属于某个Stage,才会将job进行处理,调用failJobAndIndependentStages方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private def failJobAndIndependentStages(
job: ActiveJob,
failureReason: String,
exception: Option[Throwable] = None): Unit = {
val error = new SparkException(failureReason, exception.getOrElse(null))
var ableToCancelStages = true
val shouldInterruptThread =
if (job.properties == null) false
else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
// Cancel all independent, running stages.
val stages = jobIdToStageIds(job.jobId)
if (stages.isEmpty) {
logError("No stages registered for job " + job.jobId)
}
stages.foreach { stageId =>
val jobsForStage: Option[HashSet[Int]] = stageIdToStage.get(stageId).map(_.jobIds)
if (jobsForStage.isEmpty || !jobsForStage.get.contains(job.jobId)) {
logError(
"Job %d not registered for stage %d even though that stage was registered for the job"
.format(job.jobId, stageId))
} else if (jobsForStage.get.size == 1) {
if (!stageIdToStage.contains(stageId)) {
logError(s"Missing Stage for stage with id $stageId")
} else {
// This is the only job that uses this stage, so fail the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
markStageAsFinished(stage, Some(failureReason))
} catch {
case e: UnsupportedOperationException =>
logInfo(s"Could not cancel tasks for stage $stageId", e)
ableToCancelStages = false
}
}
}
}
}
if (ableToCancelStages) {
// SPARK-15783 important to cleanup state first, just for tests where we have some asserts
// against the state. Otherwise we have a *little* bit of flakiness in the tests.
cleanupStateForJobAndIndependentStages(job)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}

这块的逻辑大概如下:首先从配置中获取spark.job.interruptOnCancel参数的值,用于决定在cancel task的时候是否要中断线程。然后根据jobId,获取它所处于的Stage。根据Stage的id,调用taskScheduler取消task,并将Stage标记为完成。
其中取消task的代码为:

1
taskScheduler.cancelTasks(stageId, shouldInterruptThread) //shouldInterruptThread用于表示是否要中断线程,如果不中断,则使用其他的方式来结束线程。稍后介绍。

继续跳转到TaskScheduler的cancelTasks方法,这个方法是抽象方法,具体的实现是由TaskSchedulerImpl来实现的。那么我们看一下 TashSchedulerImpl的cancelTasks方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
attempts.foreach { case (_, tsm) =>
// There are two possible cases here:
// 1. The task set manager has been created and some tasks have been scheduled.
// In this case, send a kill signal to the executors to kill the task and then abort
// the stage.
// 2. The task set manager has been created but no tasks has been scheduled. In this case,
// simply abort the stage.
tsm.runningTasksSet.foreach { tid =>
taskIdToExecutorId.get(tid).foreach(execId =>
backend.killTask(tid, execId, interruptThread, reason = "Stage cancelled"))
}
tsm.abort("Stage %s cancelled".format(stageId))
logInfo("Stage %d was cancelled".format(stageId))
}
}
}

cancelTasks的实现逻辑是:根据stageId得到运行task set的TaskSetManager,然后根据TaskSetManager得到正在运行的task集合,根据task的id从taskIdToExecutorId映射关系中得到运行task的Executor的id,这样调用SchedulerBackend的killTask方法,将executorId和taskId作为参数,杀掉task。
对于SchedulerBackend的实现,我们在启动executor的时候就知道了,对应的实现为CoarseGrainedSchedulerBackend。
CoarseGrainedSchedulerBackend.killTask

1
2
3
4
override def killTask(
taskId: Long, executorId: String, interruptThread: Boolean, reason: String) {
driverEndpoint.send(KillTask(taskId, executorId, interruptThread, reason))
}

于是,就向driver发送了一个 KillTask对象。driver会通过endpoint向executorId所对应的Executor发送kill task的命令。我们跳到Executor中killTask的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
val taskRunner = runningTasks.get(taskId)
if (taskRunner != null) {
if (taskReaperEnabled) {
val maybeNewTaskReaper: Option[TaskReaper] = taskReaperForTask.synchronized {
val shouldCreateReaper = taskReaperForTask.get(taskId) match {
case None => true
case Some(existingReaper) => interruptThread && !existingReaper.interruptThread
}
if (shouldCreateReaper) {
val taskReaper = new TaskReaper(
taskRunner, interruptThread = interruptThread, reason = reason)
taskReaperForTask(taskId) = taskReaper
Some(taskReaper)
} else {
None
}
}
// Execute the TaskReaper from outside of the synchronized block.
maybeNewTaskReaper.foreach(taskReaperPool.execute)
} else {
taskRunner.kill(interruptThread = interruptThread, reason = reason)
}
}
}

这里就是kill task的具体实现,从代码中我们可以知道,kill task有两种逻辑:

直接调用TaskRunner对象的kill,什么也不操作。
除了调用TaskRunner对象的kill,还会使用Reaper对Task进行dump操作。

如果要是启用Reaper,需要开启spark.task.reaper.enabled参数。

对于Reaper的实现逻辑:调用TaskRunner的kill,然后对Task对应的线程进行dump操作。关于reaper还有好几个参数,用来定义Reaper的逻辑:

直接调用TaskRunner的kill,就是不进行Reaper监控。而且我们也知道了,使用Reaper进行监控,其内部也会调用TaskRunner的kill,所以下面我们进行TaskRunner.kill方法的逻辑分析:

1
2
3
4
5
6
7
8
9
10
11
def kill(interruptThread: Boolean, reason: String): Unit = {
logInfo(s"Executor is trying to kill $taskName (TID $taskId), reason: $reason")
reasonIfKilled = Some(reason)
if (task != null) {
synchronized {
if (!finished) {
task.kill(interruptThread, reason)
}
}
}
}

TaskRunner中保存着Task的引用,调用Task的kill方法。

1
2
3
4
5
6
7
8
9
10
def kill(interruptThread: Boolean, reason: String) {
require(reason != null)
_reasonIfKilled = reason
if (context != null) {
context.markInterrupted(reason)
}
if (interruptThread && taskThread != null) {
taskThread.interrupt()
}
}

这里有三个逻辑:

设置_reasonIfKilled,如果Task还没有运行,设置了这个值后,task运行时会直接退出。一旦task运行了,设置这个值是不会停止的,因此需要第二个方式。
设置context.markInterrupted(reason),一旦Task运行了,就会在其内部构建一个TaskContext,调用这个方法,进行状态标记。在执行RDD中数据读取(调用InterruptibleIterator的next方法)的时候会调用TaskContext的killTaskIfInterrupted方法进行中断验证,如果已经调用过markInterrupted方法,killTaskIfInterrupted方法就会抛出异常,终止Task的运行。
参数指定要中断线程,直接执行线程中断。

这里在附加一下InterruptibleIterator的实现:

1
2
3
4
5
6
7
8
9
10
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {
def hasNext: Boolean = {
context.killTaskIfInterrupted()
delegate.hasNext
}
def next(): T = delegate.next()
}

其实就是对dlegate所指定的Iterator进行了一层包装。查找InterruptibleIterator的使用,RDD的getOrCompute方法返回的Iterator就是InterruptibleIterator类型。
我们知道Task读取数据时,就会用到RDD的getOrCompute方法,因此就可以让task中断啦。对于InterruptibleIterator的使用,不只是RDD,还有BlockStoreShuffleReader、NewHadoopRDD等都在使用。