spark 2.11 ApplicationMaster

本文是对 org.apache.spark.deploy.yarn.ApplicationMaster 源码进行学习的分析,spark的版本为2.11。

概述

ApplicationMaster 可以说是运行用户程序的入口类。该类的一些行为有解析用户参数、启动dirver、建立与driver的通信、启动reporter线程、启动用户类等。

主要方法分析

ApplicationMaster伴生类的main方法

该方法是ApplicationMaster的启动入口方法,方法定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def main(args: Array[String]): Unit = {
// 注册日志
SignalUtils.registerLogger(log)
// 实例化 ApplicationMasterArguments对应,用来解析传入的参数
val amArgs = new ApplicationMasterArguments(args)
// 是否设置了 --properties-file 参数,如果设置,则将properties文件中的配置加载到系统参数中
if (amArgs.propertiesFile != null) {
Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
sys.props(k) = v
}
}
// 以指定的用户运行 ApplicationMaster
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient)
// 运行 application master
System.exit(master.run())
}
}

从代码可以看出,该方法就是类的启动方法,注册日志、解析传入参数(如果参数传递了properties文件,则将properties中的配置加载到系统中),最后以特殊的用户身份启动ApplicationMaster(调用run方法)。

run

该方法用来启动applicationMaster,方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
final def run(): Int = {
try {
// 获取application的id
val appAttemptId = client.getAttemptId()
var attemptID: Option[String] = None
// 是否是集群模式,则设置集群模式需要的一些属性
if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
// Set the master and deploy mode property to match the requested mode.
System.setProperty("spark.master", "yarn")
System.setProperty("spark.submit.deployMode", "cluster")
// Set this internal configuration if it is running on cluster mode, this
// configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
attemptID = Option(appAttemptId.getAttemptId.toString)
}
// <1>
// 设置调用上下文,从spark.log.callerContext配置中读取
new CallerContext(
"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
logInfo("ApplicationAttemptId: " + appAttemptId)
// This shutdown hook should run *after* the SparkContext is shut down.
// 设置钩子函数,以便在 SparkContext 之后调用,进行操作
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
//
ShutdownHookManager.addShutdownHook(priority) { () =>
val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
if (!finished) {
// The default state of ApplicationMaster is failed if it is invoked by shut down hook.
// This behavior is different compared to 1.x version.
// If user application is exited ahead of time by calling System.exit(N), here mark
// this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
// System.exit(0) to terminate the application.
finish(finalStatus,
ApplicationMaster.EXIT_EARLY,
"Shutdown hook called before final status was reported.")
}
if (!unregistered) {
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
unregister(finalStatus, finalMsg)
cleanupStagingDir()
}
}
}
// <2>
// Call this to force generation of secret so it gets populated into the
// Hadoop UGI. This has to happen before the startUserApplication which does a
// doAs in order for the credentials to be passed on to the executor containers.
// 根据spark配置生成安全管理器
val securityMgr = new SecurityManager(sparkConf)
// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
// spark.yarn.credentials.file
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
// If a principal and keytab have been set, use that to create new credentials for executors
// periodically
credentialRenewer =
new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer()
credentialRenewer.scheduleLoginFromKeytab()
}
// <3>
// 根据不同的集群模式,调用不同的方法
if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}
// <4>
} catch {
case e: Exception =>
// catch everything else if not specifically handled
logError("Uncaught exception: ", e)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + e)
}
exitCode
}

<1> 这部分用来设置一些系统属性,以便后面使用。

<2> 设置CurrentContext以及钩子方法,以便在sparkContext销毁之后进行清理操作。钩子函数实际上是交给了 SparkShutdownHookManager 对象进行处理。

<3> 进行安全方面的一些设置,需要后续仔细看。

<4> 进行服务的启动,这里区分是集群模式(cluster)还是客户端模式(client)。其内部执行的总体步骤是一样的,只是每个步骤的做的方式不同。
接下来,先从集群模式来看,然后再看客户端模式。

runDriver

运行deiver,只有集群模式,才会执行这个方法,方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private def runDriver(securityMgr: SecurityManager): Unit = {
// 添加IP过滤器
addAmIpFilter()
// 启动用户Application,就是利用反射机制,运行用户指定的class中的main方法
userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
logInfo("Waiting for spark context initialization...")
// spark.yarn.am.waitTime
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
try {
// 等待 SparkContext的生成,最多等待 totalWaitTime 毫秒
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
// 启动 driver RPC
val driverRef = runAMEndpoint(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
// 注册Application master
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
if (!finished) {
throw new IllegalStateException("SparkContext is null but app is still running!")
}
}
// 等待 用户线程 的完成
userClassThread.join()
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
logError(
s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
"Please check earlier log output for errors. Failing the application.")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
}
}

这个方法比较重要,因此需要详细看看。addAmIpFilter方法,从代码来看是给Spark UI增加IP 过滤器的功能,方法中也会对不同的部署模式有不同的区分,对于集群模式,将过滤器信息(过滤器类和参数)设置到系统参数,而对于client模式,则通过RPC服务发送给了driver。由此可见对于集群模式,driver是运行在本地的,而客户端模式,driver是运行在别处的。运行在哪里呢?另外,addAmIpFilter实际上添加的是 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 这个Filter。
接下来,方法是在独立的线程中启动了用户类(通过–class参数传入的类),启动用户类,其大概的操作就是获取类加载器,利用反射,加载类并最终调用用户类的main方法。
接着,获取SparkContext,并通过runAMEndpoint方法得到 driverEndpint,driverEndpoint作为一个参数来向ResourceManager 注册 ApplicationMaster。
然后就是向 ResourceManager 注册 ApplcationMaster。
最后等待用户类的执行完成。

runExecutorLauncher

runExecutorLauncher方法与上面的rundirver的地位相同,只是针对client模式的启动方式。方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
val port = sparkConf.get(AM_PORT)
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
clientMode = true)
val driverRef = waitForSparkDriver()
addAmIpFilter()
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
securityMgr)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
}

此方法相对 rundriver来说就简单多了,首先创建了一个用于连接本机的RpcEnv,然后是等待SparkDriver的启动完成,添加IP Filter(通过amEndpoint发送给driver)。最后向ResourceManager 注册 ApplicationMaster。

registerAM

此方法用来处理向 ResourceManager 注册 ApplicationMaster。通过这个方法,就将ApplicationMaster与YarnRMClient和YarnAllocator联系起来了。方法定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private def registerAM(
_sparkConf: SparkConf,
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: Option[String],
securityMgr: SecurityManager) = {
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress =
_sparkConf.get(HISTORY_SERVER_ADDRESS)
.map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
.getOrElse("")
val driverUrl = RpcEndpointAddress(
_sparkConf.get("spark.driver.host"),
_sparkConf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
logInfo {
val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}
allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
securityMgr,
localResources)
allocator.allocateResources()
// 在 客户端 模式中 的 runExecutorLauncher 方法中join
reporterThread = launchReporterThread()
}

该方法主要就是获取各种参数,然后调用client(YarnRMClient)的register方法进行注册(说是向ResourceManager注册的application,但以我来看,是注册的driver),还有就是启动reporter线程。

launchReporterThread

用于生成并启动 reporter线程。方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
private def launchReporterThread(): Thread = {
// The number of failures in a row until Reporter thread give up
// 获取配置的 reporter 线程最大失败次数
val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
val t = new Thread {
override def run() {
var failureCount = 0
while (!finished) {
try {
// 如果 allocator 失败的 executor 个数已经超出了设置的最大值,则 终止 application 的运行(终止用户类的运行)
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else {
// 向YarnAllocator 申请资源
logDebug("Sending progress")
allocator.allocateResources()
}
failureCount = 0
} catch {
// 不同的异常不同的处理,中断异常,有可能是 finish 方法抛出来的
case i: InterruptedException => // do nothing
case e: ApplicationAttemptNotFoundException =>
failureCount += 1
logError("Exception from Reporter thread.", e)
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
e.getMessage)
// 如果 reporter 线程的尝试次数超过配置的最大值,则终止 用户类的运行
case e: Throwable =>
failureCount += 1
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
var sleepStart = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
// 计算 reporter 的休眠时长, 根据
sleepInterval =
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
// 进入这里,表示有丢失的container, 也有正在添加的container
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
currentAllocationInterval
} else {
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
sleepStart = System.currentTimeMillis()
allocatorLock.wait(sleepInterval)
}
val sleepDuration = System.currentTimeMillis() - sleepStart
// 如果符合这个条件,说明上面的 allocatorLock.wait所等待的时间不够,改用Thread.sleep来休眠
if (sleepDuration < sleepInterval) {
// log when sleep is interrupted
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval ms.")
// if sleep was less than the minimum interval, sleep for the rest of it
val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
if (toSleep > 0) {
logDebug(s"Going back to sleep for $toSleep ms")
// use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
// by the methods that signal allocatorLock because this is just finishing the min
// sleep interval, which should happen even if this is signalled again.
Thread.sleep(toSleep)
}
} else {
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval.")
}
} catch {
case e: InterruptedException =>
}
}
}
}
// setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.setName("Reporter")
t.start()
logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
s"initial allocation : $initialAllocationInterval) intervals")
t
}

该方法看起来很复杂,其实它主要要做的事情就是 向YarnAllocator申请资源(调用allocateResources方法)。其他代码就是判断是否还要申请资源,以及什么时候进行下一次申请。

startUserApplication

启动用户应用程序,其实就是启动用户通过 –class参数传递过来的类。方法定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
// 获取用户的类路经
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
val userClassLoader =
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
// When running pyspark, the app is run using PythonRunner. The second argument is the list
// of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
userArgs = Seq(args.primaryPyFile, "") ++ userArgs
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
// TODO(davies): add R dependencies here
}
// 用户类的main方法
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])
val userThread = new Thread {
override def run() {
try {
// 在新的线程中运行 用户类的 main方法
mainMethod.invoke(null, userArgs.toArray)
// 设置完成状态(用户类执行完成)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
// 记录日志
logDebug("Done running users class")
} catch {
// 根据异常的原因,设置用户类执行的完成状态(失败状态)
// 异常的原因分为
// 中断异常
// app异常
// 代码异常
case e: InvocationTargetException =>
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class
case SparkUserAppException(exitCode) =>
val msg = s"User application exited with status $exitCode"
logError(msg)
finish(FinalApplicationStatus.FAILED, exitCode, msg)
case cause: Throwable =>
logError("User class threw exception: " + cause, cause)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
"User class threw exception: " + cause)
}
sparkContextPromise.tryFailure(e.getCause())
} finally {
// Notify the thread waiting for the SparkContext, in case the application did not
// instantiate one. This will do nothing when the user code instantiates a SparkContext
// (with the correct master), or when the user code throws an exception (due to the
// tryFailure above).
sparkContextPromise.trySuccess(null)
}
}
}
// 设置线程的 类加载器 、 线程名字, 启动线程
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
}

首先,通过代码我们可以明确一个问题,那就是用户的程序,其实就是driver。对于这个方法实现的功能,简单来说,就是获取类路径、获取类加载器、实例话用户类、执行用户类的main方法。