spark 2.11 YarnRMClient

本文是对 org.apache.spark.deploy.yarn.YarnRMClient 源码进行学习的分析,spark的版本为2.11。

概述

YarnRMClient主要用来处理application master向Yarn resourceManager的注册和注销。

主要方法分析

register

该方法很简单,就是向YARN ResourceManager注册application master,该方法会在 ApplicationMaster的registerAM方法中调用。具体方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator = {
// 调用AMRMClient自身的方法来生成AMRMClient,再使用 Yarn 配置进行初始化,启动AMRMClient
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress
val trackingUrl = uiAddress.getOrElse {
if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
}
logInfo("Registering the ApplicationMaster")
// 向 ResourceManager 注册 application master,从代码看出 application master就是本机,TODO 这个本机是啥呢???
synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)
registered = true
}
// 生成 YarnAllocator
// driverUrl和driverRef需要说一下,
// driverUrl,是driver运行的地址,会传递给Executor,应该是用于Execurot与driver进行交互
// driverRef 在YarnAllocator中使用,用于同步executor的id,以及 发送删除executor的信息
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
}

此方法逻辑很简单,一眼就看明白。生成AMRMClient(用于访问ResourceManager),向ResourceManager注册applicationMaster,生成YarnAllocator。但是需要注意生成YarnAllocator的参数。

unregister

作用与register方法相反,从YARN ResourceManager中注销 application master。具体方法实现

1
2
3
4
5
def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit = synchronized {
if (registered) {
amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
}
}

getMaxRegAttempts

此方法就是用来定义注册application master的最大尝试次数。具体方法定义

1
2
3
4
5
6
7
8
9
10
/** 获取注册AM的最大尝试次数 分别从spark配置和yarn配置中读取 如果spark配置中设置了,则使用spark和yarn配置中最小那个值 */
def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = {
val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt)
val yarnMaxAttempts = yarnConf.getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
sparkMaxAttempts match {
case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts
case None => yarnMaxAttempts
}
}

此方法也很简单,分别从spark配置和yarn配置中读取 如果spark配置中设置了,则使用spark和yarn配置中最小那个值。没有在spark中配置,则使用yarn配置中的。

问题

哪里生成YarnRMClient对象

答案就是在ApplicationMaster的main方法中,代码:

1
2
3
4
5
6
7
8
9
def main(args: Array[String]): Unit = {
...
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
}
}

哪里调用 YarnRMClient的register方法

在register方法中看到了YarnAllocator的生成,那么在哪里调用register方法呢?答案就是 org.apache.spark.deploy.yarn.ApplicationMaster中。而且ApplicationMaster含有main方法,是程序的入口。代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private def registerAM(
_sparkConf: SparkConf,
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: Option[String],
securityMgr: SecurityManager) = {
...
allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
securityMgr,
localResources)
allocator.allocateResources()
reporterThread = launchReporterThread()
}