启动开始
在Yarn调度模式中,Executor的启动是从YarnAllocator的runAllocatedContainers方法开始的。
以下是这个方法的定义:
此处的执行逻辑是,循环要使用的container信息,如果当前运行的Executor个数小于想要启动(目标)的Executor个数(启动中的不算),那么就让launcherPool启动一个ExecutorRunnable。
ExecutorRunnable的定义
这里需要看一下ExecutorRunnable的定义:
|
|
在ExecutorRunnable的定义中,它需要如下参数:Container、YarnConfiguration、SparkConf、Master地址、executor的id、hostname、executor内存大小、executor的cpu核数、application的id、SecurityManager和本地资源信息。
Runnable的参数确定
这里先看一下YarnAllocator传递的哪些参数
Container
container 来自containersToUse集合中,containerToUse是如下得到的:
也就是说是通过AMRMClient调用AM的服务后分配的。
YarnConfiguration
YarnConfiguration是来自于YarnAllocator的构造参数:
其中第三个参数conf就是YarnConfiguration,这个conf是来自YarnRMClient的register方法:
而register方法是在ApplicationMaster的registerAM方法中调用的:
yarnConf是在ApplicationMaster中定义的:
|
|
|
|
所以,从代码可以看出,yarnConf其实是一个“org.apache.hadoop.conf.Configuration”,它包含三部分内容:1、上面的S3的属性。2、Spark配置中以“spark.hadoop.”开头的配置。3、特定的“io.file.buffer.size”属性。
SparkConf
ExecutorRunnable需要的第三个参数是SparkConf。这个参数也是来自YarnAllocator的构造方法,和YarnConfiguration一样是从YarnRMClient的register方法的参数那里得到的。继续向上找,这个参数是来自ApplicationMaster的registerAM方法,对于registerAM方法,有两个地方调用,而这两个方法传递的SparkConf也是不一样的:driver启动和Executor启动。对于driver启动和Executor的启动,区别是是否是集群模式(isClusterModel),这两个方法只会执行一种:如果是集群模式(isClusterModel==true),则driver启动,否则Executor启动。
driver启动
如果是集群模式(isClusterModel == true),就会调用runDriver方法,在runDriver方法中会调用registerAM方法。这里的SparkConf是从SparkContext中得到的:
对于driver的方式的SparkContext的获取需要之后详细了解。
Executor启动
如果是非集群模式(isClusterModel==false),就会调用runExecutorLauncher方法,在这个方法中会调用registerAM。这里传递的SparkConf就比较直观了,就是在ApplicationMaster类中实例化的SparkConf:
masterAddress
ExecutorRunnable的第四个参数是masterAddress,一个字符串类型参数。但是在yarnAllocator的runAllocatedContainers方法中生成ExecutorRunnable时传递的是“driverUrl”,这个“driverUrl”是来自YarnAllocator的构造方法的“driverUrl”,是来自ApplicationMaster的registerAM方法中生成的:
这里的_sparkConf是registerAM方法的参数,和上面的(第三个参数)SparkConf来源一致。
executorId
ExecutorRunnable的第五个参数是executorId,一个字符串类型的参数。executorId虽然是字符串类型,其实就是一个数字:
而executorIdCounter是从driver那里拿到的:
hostname
ExecutorRunnable的第六个参数是hostname,一个字符串类型的参数。实际上它代表的是executor的hostname,在YarnAllocator的runAllocatedContainers方法中生成:
executorMemory
ExecutorRunnable的第七个参数是executorMemory,一个整型类型的参数。指定了Executor可以使用的内存大小(什么的内存???)。该值是通过参数进行配置的:
从spark.executor.memory中读取,默认值为1GB。
executorCores
ExecutorRunnable的第八个参数是executorCores,一个整型参数。指定了Executor可以使用的cpu核数。该值是通过参数进行配置的:
从spark.executor.cores中读取,默认为1。该值会影响Executor中tasks的并发数。
appId
ExecutorRunnable的第九个参数是appId,一个字符串类型参数。它是通过YarnAllocator的构造参数ApplicationAttemptId对象得到的:
ApplicationAttemptId对象是在YarnRMClient中getAttemptId()方法得到的:
SecurityManager
ExecutorRunnable的第十个参数是SecurityManager对象。它也是通过一系列的方法传递过来的,最开始是在runDriver和runExecutorLauncher,但是这里和SparkConf不一样,这两个方法使用的SecurityManager是一样的。
这两个方法引用的是同一个SecurityManager:
localResources
ExecutorRunnable的最后一个参数是localResources,是一个Map[String, LocalResource]类型集合。localResources在ApplicatinMaster类中进行定义:
localResources加载和缓存的数据我们之后在介绍,这里只是梳理各个参数的定义。
启动
生成ExecutorRunnable,然后调用run方法,就可以启动一个Container(Executor)了。runAllocatedContainers方法中也是调用ExecutorRunnable的run方法来启动ExecutorRunnable的。ExecutorRunnable的run方法如下定义:
在这个方法中,先是创建一个NMClient,用YarnConfiguration进行初始化,并启动。接着调用 startContainer方法启动Container。