本文是对 org.apache.spark.deploy.yarn.ApplicationMaster 源码进行学习的分析,spark的版本为2.11。
概述
ApplicationMaster 可以说是运行用户程序的入口类。该类的一些行为有解析用户参数、启动dirver、建立与driver的通信、启动reporter线程、启动用户类等。
主要方法分析
ApplicationMaster伴生类的main方法
该方法是ApplicationMaster的启动入口方法,方法定义
从代码可以看出,该方法就是类的启动方法,注册日志、解析传入参数(如果参数传递了properties文件,则将properties中的配置加载到系统中),最后以特殊的用户身份启动ApplicationMaster(调用run方法)。
run
该方法用来启动applicationMaster,方法定义如下
<1> 这部分用来设置一些系统属性,以便后面使用。1>
<2> 设置CurrentContext以及钩子方法,以便在sparkContext销毁之后进行清理操作。钩子函数实际上是交给了 SparkShutdownHookManager 对象进行处理。2>
<3> 进行安全方面的一些设置,需要后续仔细看。3>
<4> 进行服务的启动,这里区分是集群模式(cluster)还是客户端模式(client)。其内部执行的总体步骤是一样的,只是每个步骤的做的方式不同。
接下来,先从集群模式来看,然后再看客户端模式。4>
runDriver
运行deiver,只有集群模式,才会执行这个方法,方法定义如下
这个方法比较重要,因此需要详细看看。addAmIpFilter方法,从代码来看是给Spark UI增加IP 过滤器的功能,方法中也会对不同的部署模式有不同的区分,对于集群模式,将过滤器信息(过滤器类和参数)设置到系统参数,而对于client模式,则通过RPC服务发送给了driver。由此可见对于集群模式,driver是运行在本地的,而客户端模式,driver是运行在别处的。运行在哪里呢?另外,addAmIpFilter实际上添加的是 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 这个Filter。
接下来,方法是在独立的线程中启动了用户类(通过–class参数传入的类),启动用户类,其大概的操作就是获取类加载器,利用反射,加载类并最终调用用户类的main方法。
接着,获取SparkContext,并通过runAMEndpoint方法得到 driverEndpint,driverEndpoint作为一个参数来向ResourceManager 注册 ApplicationMaster。
然后就是向 ResourceManager 注册 ApplcationMaster。
最后等待用户类的执行完成。
runExecutorLauncher
runExecutorLauncher方法与上面的rundirver的地位相同,只是针对client模式的启动方式。方法定义如下
此方法相对 rundriver来说就简单多了,首先创建了一个用于连接本机的RpcEnv,然后是等待SparkDriver的启动完成,添加IP Filter(通过amEndpoint发送给driver)。最后向ResourceManager 注册 ApplicationMaster。
registerAM
此方法用来处理向 ResourceManager 注册 ApplicationMaster。通过这个方法,就将ApplicationMaster与YarnRMClient和YarnAllocator联系起来了。方法定义
该方法主要就是获取各种参数,然后调用client(YarnRMClient)的register方法进行注册(说是向ResourceManager注册的application,但以我来看,是注册的driver),还有就是启动reporter线程。
launchReporterThread
用于生成并启动 reporter线程。方法定义如下
该方法看起来很复杂,其实它主要要做的事情就是 向YarnAllocator申请资源(调用allocateResources方法)。其他代码就是判断是否还要申请资源,以及什么时候进行下一次申请。
startUserApplication
启动用户应用程序,其实就是启动用户通过 –class参数传递过来的类。方法定义
首先,通过代码我们可以明确一个问题,那就是用户的程序,其实就是driver。对于这个方法实现的功能,简单来说,就是获取类路径、获取类加载器、实例话用户类、执行用户类的main方法。