本文是对 org.apache.spark.deploy.yarn.YarnAllocator 类源码进行学习的分析,spark的版本为2.11。
总体概述
YarnAllocator可以理解成一个Container的筛选器。当调用了YarnAllocator.allocateResources()方法后,程序就会进行各种处理,最终调用ExecutorRunnable类来启动Executor。在YarnAllocator类中,最主要的方法有:allocateResources()、updateResourceRequests()、handleAllocatedContainers()、runAllocatedContainers()和processCompletedContainers()。而整个这些方法的调用,是通过allocateResources()来调用的。基本的流程如下图:
主要方法的分析
allocateResources
资源分配的入口,首先看方法的定义
这个方法首先就是要更新资源的申请(调用updateResourceRequests()方法,我们稍后再看),然后就是调用AMRMClient(amClient)来分配资源,分配资源的返回值(allocateResponse)会包含三部分信息:已经分配的Container(allocatedContainers)、可用的资源和完成的Container(completedContainers)。对于已经分配的和完成的Container,会有对应的方法去处理;对于可用的资源,只是输出到日志。
updateResourceRequests
更新资源的请求信息,首先看方法的定义
<1> 部分主要就是计算系统是否已经达到了目标个数的executor(container是executor的容器,目前一个container只包含了一个executor),并计算缺少的个数。计算的公式就是val missing = targetNumExecutors - numPendingAllocate - numExecutorsStarting.get - numExecutorsRunning.get。targetNumExecutors是配置中 spark.dynamicAllocation.minExecutors 、spark.dynamicAllocation.initialExecutors 以及 spark.executor.instances 三个配置中的最大值(开启了动态分配的话,如果没开启,则使用 spark.executor.instances 的值),并且要求 spark.dynamicAllocation.maxExecutors >= spark.dynamicAllocation.initialExecutors >= spark.dynamicAllocation.minExecutors(如果没有开启动态分配,则不存在这种要求)。这里涉及了4个配置项。numPendingAllocate的值调用AMRMClient的getMatchingRequests方法,获取location为*的所有请求。1>
<2> 这一部分就是对添加中的请求(pendingAllocate)进行分类,分为位置自由的、位置匹配的和位置不匹配的。分类的依据是ContainerRequest.getNodes。如果nodes为空,则认为是位置自由的,如果nodes不空,则判断nodes是否hostToLocalTaskCounts的keyset有交集,如果有则认为是匹配的,否则不匹配。这里的匹配和不匹配,大概就是寻找本地的containerRequest(????????????????)。2>
<3> 这一部分是对上面找出来的位置不匹配的请求,进行取消,并记录日志。筛选的规则是优先使用位置符合的,坚决不使用位置不符合的,数量不够的时候,使用位置自由的。3>
<4> 这一部分是计算出需要创建的container个数。进行ContainerRequest的创建,首先创建位置符合的,然后创建位置自由的,并且将多余的位置自由的请求取消掉。4>
<5> 在这里调用AMRMClient的addContainerRequest方法来增加ContainerRequest。5>
<6> 记录日志6>
<7> 将超出目标个数的container(位置自由的)取消。
所以从总体来看,这个方法其实就是把container的个数控制在目标个数范围内,如果缺少了,则增加,如果多了,则取消一些请求。7>
handleAllocatedContainers
此方法用来处理申请资源的container。方法的定义如下
从代码来看,这个方法其实就是对allocatedContainers再次进行过滤,根据主机、机架 和 通配主机(*)。最后将不匹配的释放掉(不同于取消请求)。并在方法的最后启动所有分配的container启动。
runAllocatedContainers
运行 container,方法定义如下
<1> 获取container和executor的信息,方便下面使用。1>
<2> 定义了一个方法,这个方法就是用来更新各种数据关系,有executor运行的数量、executor启动的数量、executorId到container的对应关系、containerId到executorId的对应关系、host上运行的contianer的对应关系、container在哪个host运行的关系。2>
<3> 异步生成ExecutorRunnable,并启动。3>
processCompletedContainers
此方法也在allocateResources()方法中调用,用来处理完成的container,完成的container可能是被kill掉,也可能是正常完成的。方法定义如下
<1> 获取containerId, 判断container是否在releasedContainers 中,获取 container所在的host,以便后面使用。1>
<2> 分析contianer退出的原因,退出的类型分为 App引发的和非App引发的。具体的判断,可以看代码和注释。2>
<3> 主要为了清理container各种关系的保存信息。3>
<4> 使用Netty RPC发送移除Executor的信息。4>
总结
从上面的代码的调用过程以及实现我们可以看出,YarnAllocator实际上类似一个过滤器,它会从Resource Manager那里申请资源(通过AMRMClient获取),对获取到资源按照host -> stack -> any的顺序筛选container,并将合适的container启动后,反馈给调用者。
这个类功能很简单,但是在整个集群中又比较不太好理解,首先需要确定的是,YarnAllocator自己不管理资源(不对资源进行操作),只是筛选,虽然也会有释放、取消操作,但是这些操作都是调用Resource Manager的api来完成的。
另外,YarnAllocator是运行在driver上的,由AM来调用(?????需要再次确认),因此,如果每个application都会有自己的YarnAllocator,它只是为自己的job的运行筛选container,而不是全局为所有的application统一筛选。
AMRMClient的一些方法
| 方法名 | 作用 |
|---|---|
| AllocateResponse allocate(float progressIndicator) | 请求额外的container并接收新的container的分配。 |
| List<? extends Collection |
获取与给定参数匹配的未完成的请求。 |
| void removeContainerRequest(T req) | 移除之前的container请求。 |
| void addContainerRequest(T req) | 在调用allocate之前为container申请资源。 |
| void releaseAssignedContainer(ContainerId containerId) | 释放由Resource Manager分配的container。 |
| RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) | 注册application Master |
一些问题
YarnAllocator是什么时候生成的
是在YarnRMClient的register方法中,向AMRMClient注册ApplicationMaster(调用AMRMClient.registerApplicationMaster方法)完成之后生成的。