spark 2.11 YarnAllocator

本文是对 org.apache.spark.deploy.yarn.YarnAllocator 类源码进行学习的分析,spark的版本为2.11。

总体概述

YarnAllocator可以理解成一个Container的筛选器。当调用了YarnAllocator.allocateResources()方法后,程序就会进行各种处理,最终调用ExecutorRunnable类来启动Executor。在YarnAllocator类中,最主要的方法有:allocateResources()、updateResourceRequests()、handleAllocatedContainers()、runAllocatedContainers()和processCompletedContainers()。而整个这些方法的调用,是通过allocateResources()来调用的。基本的流程如下图:

主要方法的分析

allocateResources

资源分配的入口,首先看方法的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def allocateResources(): Unit = synchronized {
updateResourceRequests()
// 处理指示器
val progressIndicator = 0.1f
// Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
// requests.
// 调用 AMRMClient 分配资源
val allocateResponse = amClient.allocate(progressIndicator)
// 得到已经分配的 container
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
// 输出日志信息,包括 分配的container数量,正在运行的以及启动的executor数量,以及可用的资源信息
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
numExecutorsRunning.get,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))
// 处理已经分配的container
handleAllocatedContainers(allocatedContainers.asScala)
}
// 获取已经执行完成的 container
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
//处理已经完成的container
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning.get))
}
}

这个方法首先就是要更新资源的申请(调用updateResourceRequests()方法,我们稍后再看),然后就是调用AMRMClient(amClient)来分配资源,分配资源的返回值(allocateResponse)会包含三部分信息:已经分配的Container(allocatedContainers)、可用的资源和完成的Container(completedContainers)。对于已经分配的和完成的Container,会有对应的方法去处理;对于可用的资源,只是输出到日志。

updateResourceRequests

更新资源的请求信息,首先看方法的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def updateResourceRequests(): Unit = {
// 得到正在添加的container 并 得到正在添加 container的数量
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
// 计划要启动的executor数量 - 正在请求启动的executor - 已经启动的executor - 已经运行任务的executor = 缺少多少executor
// 所以这里是在计算 比预计要启动的executor,还缺少多少个
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - numExecutorsRunning.get
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
s"executorsStarting: ${numExecutorsStarting.get}")
// <1>
// missing 可以为正数 也可能为负数,负数则说明 动态分配分配多了,但是没有超过最大个数,这个数是通过计划启动个数算出来的
if (missing > 0) {
logInfo(s"Will request $missing executor container(s), each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
// 将要添加的container请求拆分到三个组:位置匹配列表、位置不匹配列表 和 无位置列表。
// 对于位置匹配的 container 请求,将他们放到可用的地方,等待分配
// 对于位置不匹配的和无位置的container请求,取消这些container的请求,因为 位置优先权已经变了,
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
// <2>
// cancel "stale" requests for locations that are no longer needed
// 对于位置不匹配的container,进行移除操作,并记录日志, 移除了N个container
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
if (cancelledContainers > 0) {
logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
}
// <3>
// 计算还可以分配的container的数量,就是 缺少的 + 取消的
// 因为cancelledContainers的个数实际上就是从pendingAllocate 中取消的
val availableContainers = missing + cancelledContainers
// 计算潜在的container就是 可以分配的container + 上面那些 不限制位置的的contianer的数量
val potentialContainers = availableContainers + anyHostRequests.size
// TODO 这是在弄啥 ??? 应该是根据 潜在container的数量,生成对应个的contaner的位置引用
val containerLocalityPreferences = if (labelExpression.isEmpty) {
containerPlacementStrategy.localityOfRequestedContainers(
potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
allocatedHostToContainersMap, localRequests)
} else {
Array.empty[ContainerLocalityPreferences]
}
// 根据上面的containerLocalityPreferences 创建container添加请求(注意是请求)
// newLocalityRequest 用来记录要添加的container的请求
val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
containerLocalityPreferences.foreach {
// createContainerRequest 用来生成container的创建请求
case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
newLocalityRequests += createContainerRequest(resource, nodes, racks)
case _ =>
}
// 需要再次判断 总的container的数量,如果availableContainers > newLocalityRequests 表示,还不够
// 为啥 availableContainers 会大于 newLocalityRequests ? 因为 labelExpression.isEmpty 为空时,会生成一个空的Array
if (availableContainers >= newLocalityRequests.size) {
// more containers are available than needed for locality, fill in requests for any host
for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
// createContainerRequest 用来生成container的创建请求
newLocalityRequests += createContainerRequest(resource, null, null)
}
} else {
// 这里的 newLocalityRequests 实际上对应的个是 potentialContainers = availableContainers + anyHostRequests.size
// 因此,如果 newLocalityRequests > availableContainers 则表示生成多了,且 anyHostRequests 的多了
val numToCancel = newLocalityRequests.size - availableContainers
// 因此释放到一些多余的 anyHostRequests
// cancel some requests without locality preferences to schedule more local containers
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
if (numToCancel > 0) {
logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
}
}
// <4>
// AMRMClient 请求添加container
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
}
// <5>
if (log.isInfoEnabled()) {
val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
if (anyHost.nonEmpty) {
logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
}
localized.foreach { request =>
logInfo(s"Submitted container request for host ${hostStr(request)}.")
}
}
// <6>
} else if (numPendingAllocate > 0 && missing < 0) {
// 如果不缺少executor,并且还有正在添加的executor
// 计算要取消的contaner的数量,为什么是最小值,个人这样理解:-missing,其实是多出来的,但是在计算missing的时候,
// 已经减去 numPendingAllocate 了,也就是说 numPendingAllocate 认为是已经使用的数量
// 因此,如果取最大值,那么当 numPendingAllocate > -missing 时,删除的container就太多了
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
s"total $targetNumExecutors executors.")
// 获取匹配的请求,并且有匹配的数据,则移除这些container(而且我猜测,这里的寻找匹配的请求,是在pending的队列中找的)
val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.iterator().next().asScala
.take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
}
// <7>
}
}

<1> 部分主要就是计算系统是否已经达到了目标个数的executor(container是executor的容器,目前一个container只包含了一个executor),并计算缺少的个数。计算的公式就是val missing = targetNumExecutors - numPendingAllocate - numExecutorsStarting.get - numExecutorsRunning.get。targetNumExecutors是配置中 spark.dynamicAllocation.minExecutors 、spark.dynamicAllocation.initialExecutors 以及 spark.executor.instances 三个配置中的最大值(开启了动态分配的话,如果没开启,则使用 spark.executor.instances 的值),并且要求 spark.dynamicAllocation.maxExecutors >= spark.dynamicAllocation.initialExecutors >= spark.dynamicAllocation.minExecutors(如果没有开启动态分配,则不存在这种要求)。这里涉及了4个配置项。numPendingAllocate的值调用AMRMClient的getMatchingRequests方法,获取location为*的所有请求。

<2> 这一部分就是对添加中的请求(pendingAllocate)进行分类,分为位置自由的、位置匹配的和位置不匹配的。分类的依据是ContainerRequest.getNodes。如果nodes为空,则认为是位置自由的,如果nodes不空,则判断nodes是否hostToLocalTaskCounts的keyset有交集,如果有则认为是匹配的,否则不匹配。这里的匹配和不匹配,大概就是寻找本地的containerRequest(????????????????)。

<3> 这一部分是对上面找出来的位置不匹配的请求,进行取消,并记录日志。筛选的规则是优先使用位置符合的,坚决不使用位置不符合的,数量不够的时候,使用位置自由的。

<4> 这一部分是计算出需要创建的container个数。进行ContainerRequest的创建,首先创建位置符合的,然后创建位置自由的,并且将多余的位置自由的请求取消掉。

<5> 在这里调用AMRMClient的addContainerRequest方法来增加ContainerRequest。

<6> 记录日志

<7> 将超出目标个数的container(位置自由的)取消。
所以从总体来看,这个方法其实就是把container的个数控制在目标个数范围内,如果缺少了,则增加,如果多了,则取消一些请求。

handleAllocatedContainers

此方法用来处理申请资源的container。方法的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
// 用来保存要使用的Containers
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
// Match incoming requests by host 根据host进行匹配,匹配不成功的下面继续匹配
val remainingAfterHostMatches = new ArrayBuffer[Container]
for (allocatedContainer <- allocatedContainers) {
matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
containersToUse, remainingAfterHostMatches)
}
// Match remaining by rack 对上面按照host匹配不成功的进行机架匹配
val remainingAfterRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
// 通过上面的两次匹配可以看到,最优先使用的是 host -> rack -> *
// Assign remaining that are neither node-local nor rack-local
val remainingAfterOffRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterRackMatches) {
matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
remainingAfterOffRackMatches)
}
// 三次匹配都没有成功的,就释放掉了
if (!remainingAfterOffRackMatches.isEmpty) {
logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
s"allocated to us")
for (container <- remainingAfterOffRackMatches) {
internalReleaseContainer(container)
}
}
// 启动所有的container,这些container上经过上面过滤后的可以使用container
runAllocatedContainers(containersToUse)
logInfo("Received %d containers from YARN, launching executors on %d of them."
.format(allocatedContainers.size, containersToUse.size))
}

从代码来看,这个方法其实就是对allocatedContainers再次进行过滤,根据主机、机架 和 通配主机(*)。最后将不匹配的释放掉(不同于取消请求)。并在方法的最后启动所有分配的container启动。

runAllocatedContainers

运行 container,方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
for (container <- containersToUse) {
// executorIdConter 应该是同步的,用来生成新的executor的id
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId")
//<1>
// 主要是为了更新各种计数
def updateInternalState(): Unit = synchronized {
// 正在运行的executor加一
numExecutorsRunning.incrementAndGet()
// 启动中的executor 减一,因为已经运行了,就不能处于启动中了
numExecutorsStarting.decrementAndGet()
//保存 executor -> container的关系 以及 container -> executorId的关系
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
// 记录 某个主机上都有哪些 containers
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
containerSet += containerId
// 记录contain运行在那个主机上
allocatedContainerToHostMap.put(containerId, executorHostname)
}
// <2>
// 如果正在运行的executor的数量小于 想要启动的executor个数
if (numExecutorsRunning.get < targetNumExecutors) {
// 将启动的executor的计数加一
// 这里为啥要加一呢???
// 因为 接下来要启动 container了,没有启动起来之前则认为处于启动中,因此先加一,一旦启动完成,则调用上面的updateInternalState
// 方法,在这个方法中会对运行中的executor加一,启动中的executor减一。
numExecutorsStarting.incrementAndGet()
// spark.yarn.launchContainers 配置的值
if (launchContainers) {
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
// 调用 ExecutorRunnalbe方法来启动container
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
// 跟新各种计数
updateInternalState()
} catch {
case e: Throwable =>
numExecutorsStarting.decrementAndGet()
if (NonFatal(e)) {
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately
// to avoid unnecessary resource occupation.
// 启动失败,则释放掉这个container
amClient.releaseAssignedContainer(containerId)
} else {
throw e
}
}
}
})
} else {
// For test only
updateInternalState()
}
} else {
logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
"reached target Executors count: %d.").format(
numExecutorsRunning.get, targetNumExecutors))
}
}
// <3>
}

<1> 获取container和executor的信息,方便下面使用。

<2> 定义了一个方法,这个方法就是用来更新各种数据关系,有executor运行的数量、executor启动的数量、executorId到container的对应关系、containerId到executorId的对应关系、host上运行的contianer的对应关系、container在哪个host运行的关系。

<3> 异步生成ExecutorRunnable,并启动。

processCompletedContainers

此方法也在allocateResources()方法中调用,用来处理完成的container,完成的container可能是被kill掉,也可能是正常完成的。方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
// 循环完成的container
for (completedContainer <- completedContainers) {
// 获取containerId, 判断container是否在releasedContainers 中,获取 container所在的host
val containerId = completedContainer.getContainerId
val alreadyReleased = releasedContainers.remove(containerId)
val hostOpt = allocatedContainerToHostMap.get(containerId)
val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
// <1>
// 如果alreadyReleased为false,则表示releasedContainers 中没有这个container,或者是有但是删除失败
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId,
onHostStr,
completedContainer.getState,
completedContainer.getExitStatus))
// 获取container的退出状态
val exitStatus = completedContainer.getExitStatus
// 根据退出状态来判断是否是由于Application的原因
// 以及退出的原因
// ContainerExitStatus.SUCCESS 表示是因为YARN事件,不是因为运行的job发生error而导致的
// ContainerExitStatus.PREEMPTED 表示主机上的端口被占用了
// VMEM_EXCEEDED_EXIT_CODE 表示 虚拟内存的问题
// PMEM_EXCEEDED_EXIT_CODE 表示 物理内存的问题
// 否则
// 除了第一种和第二种,其他的认为都属于 Application的原因
val (exitCausedByApp, containerExitReason) = exitStatus match {
case ContainerExitStatus.SUCCESS =>
(false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
"pre-emption) and not because of an error in the running job.")
case ContainerExitStatus.PREEMPTED =>
// Preemption is not the fault of the running tasks, since YARN preempts containers
// merely to do resource sharing, and tasks that fail due to preempted executors could
// just as easily finish on any other executor. See SPARK-8167.
(false, s"Container ${containerId}${onHostStr} was preempted.")
// Should probably still count memory exceeded exit codes towards task failures
case VMEM_EXCEEDED_EXIT_CODE =>
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
VMEM_EXCEEDED_PATTERN))
case PMEM_EXCEEDED_EXIT_CODE =>
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
case _ =>
// Enqueue the timestamp of failed executor
failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
}
// 如果是由于Application的原因退出的,则以警告记录日志,否则以info级别记录日志
if (exitCausedByApp) {
logWarning(containerExitReason)
} else {
logInfo(containerExitReason)
}
// 生成一个ExecutorExited 对象并返回给变量
ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)
} else {
// 如果 alreadyReleased 为true,则表示 container已经被kill掉了,在 internalReleaseContainer 方法中会操作
ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,
s"Container $containerId exited from explicit termination request.")
}
// <2>
for {
host <- hostOpt
// 根据host获取上面所运行的container
containerSet <- allocatedHostToContainersMap.get(host)
} {
// 将container从host所包含的container集合中删除,这样host上的container集合就含有这个container了
containerSet.remove(containerId)
// 删除完成后,如果 container 集合列表空了,则说明 host上只运行了这一个contianer,则删除host与container的对应关系,否则就更新一下
if (containerSet.isEmpty) {
allocatedHostToContainersMap.remove(host)
} else {
allocatedHostToContainersMap.update(host, containerSet)
}
// 将container 到 host的映射关系也删除
allocatedContainerToHostMap.remove(containerId)
}
// 移除 container到executor的对应关系
containerIdToExecutorId.remove(containerId).foreach { eid =>
// 将executor到container的对应关系也删除
executorIdToContainer.remove(eid)
// 从 pendingLossReasonRequests 移除 executor
pendingLossReasonRequests.remove(eid) match {
case Some(pendingRequests) =>
// Notify application of executor loss reasons so it can decide whether it should abort
pendingRequests.foreach(_.reply(exitReason))
case None =>
releasedExecutorLossReasons.put(eid, exitReason)
}
// <3>
// 如果没有被kill掉
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
// Notify backend about the failure of the executor
// container非异常释放的计数加一
numUnexpectedContainerRelease += 1
// 发送删除 execurot的请求
driverRef.send(RemoveExecutor(eid, exitReason))
}
// <4>
}
}
}

<1> 获取containerId, 判断container是否在releasedContainers 中,获取 container所在的host,以便后面使用。

<2> 分析contianer退出的原因,退出的类型分为 App引发的和非App引发的。具体的判断,可以看代码和注释。

<3> 主要为了清理container各种关系的保存信息。

<4> 使用Netty RPC发送移除Executor的信息。

总结

从上面的代码的调用过程以及实现我们可以看出,YarnAllocator实际上类似一个过滤器,它会从Resource Manager那里申请资源(通过AMRMClient获取),对获取到资源按照host -> stack -> any的顺序筛选container,并将合适的container启动后,反馈给调用者。
这个类功能很简单,但是在整个集群中又比较不太好理解,首先需要确定的是,YarnAllocator自己不管理资源(不对资源进行操作),只是筛选,虽然也会有释放、取消操作,但是这些操作都是调用Resource Manager的api来完成的。
另外,YarnAllocator是运行在driver上的,由AM来调用(?????需要再次确认),因此,如果每个application都会有自己的YarnAllocator,它只是为自己的job的运行筛选container,而不是全局为所有的application统一筛选。

AMRMClient的一些方法

方法名 作用
AllocateResponse allocate(float progressIndicator) 请求额外的container并接收新的container的分配。
List<? extends Collection> getMatchingRequests(Priority priority, String resourceName, Resource capability) 获取与给定参数匹配的未完成的请求。
void removeContainerRequest(T req) 移除之前的container请求。
void addContainerRequest(T req) 在调用allocate之前为container申请资源。
void releaseAssignedContainer(ContainerId containerId) 释放由Resource Manager分配的container。
RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) 注册application Master

一些问题

YarnAllocator是什么时候生成的

是在YarnRMClient的register方法中,向AMRMClient注册ApplicationMaster(调用AMRMClient.registerApplicationMaster方法)完成之后生成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress
val trackingUrl = uiAddress.getOrElse {
if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
}
logInfo("Registering the ApplicationMaster")
synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)
registered = true
}
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
}