Spark-launch-Executor

启动开始

在Yarn调度模式中,Executor的启动是从YarnAllocator的runAllocatedContainers方法开始的。
以下是这个方法的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Launches executors in the allocated containers.
*/
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
for (container <- containersToUse) {
... 此处是记录各种状态
if (numExecutorsRunning.get < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
updateInternalState()
} catch {
... 异常的处理
}
}
})
} else {
// For test only
updateInternalState()
}
} else {
logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
"reached target Executors count: %d.").format(
numExecutorsRunning.get, targetNumExecutors))
}
}
}

此处的执行逻辑是,循环要使用的container信息,如果当前运行的Executor个数小于想要启动(目标)的Executor个数(启动中的不算),那么就让launcherPool启动一个ExecutorRunnable。

ExecutorRunnable的定义

这里需要看一下ExecutorRunnable的定义:

1
2
3
4
5
6
7
8
9
10
11
12
private[yarn] class ExecutorRunnable(
container: Option[Container],
conf: YarnConfiguration,
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]) extends Logging

在ExecutorRunnable的定义中,它需要如下参数:Container、YarnConfiguration、SparkConf、Master地址、executor的id、hostname、executor内存大小、executor的cpu核数、application的id、SecurityManager和本地资源信息。

Runnable的参数确定

这里先看一下YarnAllocator传递的哪些参数

Container

container 来自containersToUse集合中,containerToUse是如下得到的:

1
2
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()

也就是说是通过AMRMClient调用AM的服务后分配的。

YarnConfiguration

YarnConfiguration是来自于YarnAllocator的构造参数:

其中第三个参数conf就是YarnConfiguration,这个conf是来自YarnRMClient的register方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator = {
... // 省略其他操作
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
}

而register方法是在ApplicationMaster的registerAM方法中调用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private def registerAM(
_sparkConf: SparkConf,
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: Option[String],
securityMgr: SecurityManager) = {
allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
securityMgr,
localResources)
}

yarnConf是在ApplicationMaster中定义的:

1
2
private val sparkConf = new SparkConf()
private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)

1
2
3
4
5
def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new Configuration()
appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
hadoopConf
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {
// Note: this null check is around more than just access to the "conf" object to maintain
// the behavior of the old implementation of this code, for backwards compatibility.
if (conf != null) {
// Explicitly check for S3 environment variables
val keyId = System.getenv("AWS_ACCESS_KEY_ID")
val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
if (keyId != null && accessKey != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", keyId)
hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)
hadoopConf.set("fs.s3a.access.key", keyId)
hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)
hadoopConf.set("fs.s3a.secret.key", accessKey)
val sessionToken = System.getenv("AWS_SESSION_TOKEN")
if (sessionToken != null) {
hadoopConf.set("fs.s3a.session.token", sessionToken)
}
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}
}

所以,从代码可以看出,yarnConf其实是一个“org.apache.hadoop.conf.Configuration”,它包含三部分内容:1、上面的S3的属性。2、Spark配置中以“spark.hadoop.”开头的配置。3、特定的“io.file.buffer.size”属性。

SparkConf

ExecutorRunnable需要的第三个参数是SparkConf。这个参数也是来自YarnAllocator的构造方法,和YarnConfiguration一样是从YarnRMClient的register方法的参数那里得到的。继续向上找,这个参数是来自ApplicationMaster的registerAM方法,对于registerAM方法,有两个地方调用,而这两个方法传递的SparkConf也是不一样的:driver启动和Executor启动。对于driver启动和Executor的启动,区别是是否是集群模式(isClusterModel),这两个方法只会执行一种:如果是集群模式(isClusterModel==true),则driver启动,否则Executor启动。

driver启动

如果是集群模式(isClusterModel == true),就会调用runDriver方法,在runDriver方法中会调用registerAM方法。这里的SparkConf是从SparkContext中得到的:

1
2
3
4
val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS))
...
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
}

对于driver的方式的SparkContext的获取需要之后详细了解。

Executor启动

如果是非集群模式(isClusterModel==false),就会调用runExecutorLauncher方法,在这个方法中会调用registerAM。这里传递的SparkConf就比较直观了,就是在ApplicationMaster类中实例化的SparkConf:

1
private val sparkConf = new SparkConf()

masterAddress

ExecutorRunnable的第四个参数是masterAddress,一个字符串类型参数。但是在yarnAllocator的runAllocatedContainers方法中生成ExecutorRunnable时传递的是“driverUrl”,这个“driverUrl”是来自YarnAllocator的构造方法的“driverUrl”,是来自ApplicationMaster的registerAM方法中生成的:

1
val driverUrl = RpcEndpointAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

这里的_sparkConf是registerAM方法的参数,和上面的(第三个参数)SparkConf来源一致。

executorId

ExecutorRunnable的第五个参数是executorId,一个字符串类型的参数。executorId虽然是字符串类型,其实就是一个数字:

1
2
executorIdCounter += 1
val executorId = executorIdCounter.toString

而executorIdCounter是从driver那里拿到的:

1
private var executorIdCounter: Int = driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)

hostname

ExecutorRunnable的第六个参数是hostname,一个字符串类型的参数。实际上它代表的是executor的hostname,在YarnAllocator的runAllocatedContainers方法中生成:

1
val executorHostname = container.getNodeId.getHost

executorMemory

ExecutorRunnable的第七个参数是executorMemory,一个整型类型的参数。指定了Executor可以使用的内存大小(什么的内存???)。该值是通过参数进行配置的:

1
2
3
4
5
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

从spark.executor.memory中读取,默认值为1GB。

executorCores

ExecutorRunnable的第八个参数是executorCores,一个整型参数。指定了Executor可以使用的cpu核数。该值是通过参数进行配置的:

1
2
3
4
5
protected val executorCores = sparkConf.get(EXECUTOR_CORES)
private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
.intConf
.createWithDefault(1)

从spark.executor.cores中读取,默认为1。该值会影响Executor中tasks的并发数。

appId

ExecutorRunnable的第九个参数是appId,一个字符串类型参数。它是通过YarnAllocator的构造参数ApplicationAttemptId对象得到的:

1
appAttemptId.getApplicationId.toString

ApplicationAttemptId对象是在YarnRMClient中getAttemptId()方法得到的:

1
2
3
def getAttemptId(): ApplicationAttemptId = {
YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId()
}

SecurityManager

ExecutorRunnable的第十个参数是SecurityManager对象。它也是通过一系列的方法传递过来的,最开始是在runDriver和runExecutorLauncher,但是这里和SparkConf不一样,这两个方法使用的SecurityManager是一样的。

1
2
3
4
5
if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}

这两个方法引用的是同一个SecurityManager:

1
val securityMgr = new SecurityManager(sparkConf)

localResources

ExecutorRunnable的最后一个参数是localResources,是一个Map[String, LocalResource]类型集合。localResources在ApplicatinMaster类中进行定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private val localResources = {
val resources = HashMap[String, LocalResource]()
def setupDistributedCache(
file: String,
rtype: LocalResourceType,
timestamp: String,
size: String,
vis: String): Unit = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(rtype)
amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
amJarRsrc.setTimestamp(timestamp.toLong)
amJarRsrc.setSize(size.toLong)
val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
resources(fileName) = amJarRsrc
}
val distFiles = sparkConf.get(CACHED_FILES)
val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
val resTypes = sparkConf.get(CACHED_FILES_TYPES)
for (i <- 0 to distFiles.size - 1) {
val resType = LocalResourceType.valueOf(resTypes(i))
setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,
visibilities(i))
}
sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path =>
val uri = new URI(path)
val fs = FileSystem.get(uri, yarnConf)
val status = fs.getFileStatus(new Path(uri))
// SPARK-16080: Make sure to use the correct name for the destination when distributing the
// conf archive to executors.
val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(),
Client.LOCALIZED_CONF_DIR)
setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE,
status.getModificationTime().toString, status.getLen.toString,
LocalResourceVisibility.PRIVATE.name())
}
CACHE_CONFIGS.foreach { e =>
sparkConf.remove(e)
sys.props.remove(e.key)
}
resources.toMap
}

localResources加载和缓存的数据我们之后在介绍,这里只是梳理各个参数的定义。

启动

生成ExecutorRunnable,然后调用run方法,就可以启动一个Container(Executor)了。runAllocatedContainers方法中也是调用ExecutorRunnable的run方法来启动ExecutorRunnable的。ExecutorRunnable的run方法如下定义:

1
2
3
4
5
6
7
def run(): Unit = {
logDebug("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
startContainer()
}

在这个方法中,先是创建一个NMClient,用YarnConfiguration进行初始化,并启动。接着调用 startContainer方法启动Container。