Spark 2.11 Submit的流程

本文用于整理在使用spark-submit提交任务的流程

spark-submit脚本的定义

使用spark-submit提交任务的时候,实际上调用的是${SPARK_HOME}/bin/spark-submit来提交的。例如:

1
2
3
4
5
6
7
8
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

如下是${SPARK_HOME}/bin/spark-submit脚本的定义:

1
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

从代码可以看出,实际上执行的是 org.apache.spark.deploy.SparkSubmit类。因此我们具体看看这个类的实现。

SparkSubmit的启动流程

如下是SparkSubmit主函数的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
override def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}

进入主函数后,会使用SparkSubmitArguments类对SparkSubmit的命令行参数进行解析。然后根据参数中的action信息进行具体的操作。
由此也可以看出,SparkSubmit支持三种action:–submit、–kill和–status。

SparkSubmitArguments

SparkSubmitArgument的继承关系

image.png
其中SparkSubmitArgumentsParser是没有具体实现,SparkSubmitOptionParser主要用来解析option。

SparkSubmitArguments实例化执行

在SparkSubmit中会利用main的参数生成一个SparkSubmitArguments的,生成SparkSubmitArguments对象的时候就会执行如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try {
parse(args.asJava)
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Remove keys that don't start with "spark." from `sparkProperties`.
ignoreNonSparkProperties()
// Use `sparkProperties` map along with env vars to fill in any missing parameters
loadEnvironmentArguments()
validateArguments()

首先调用的是parse方法,而SparkSubmitArguments本身是没有这个方法的,但是它会从父类SparkSubmitOptionParser那里继承,因此调用的父类的parse方法。

SparkSubmitOptionParser

SparkSbumitOptionParser用来解析命令行提供的参数,从SparkSubmitOptionParser中我们可以看出,可以使用的参数列表如下:

标准参数
option的定义 意义 例如
–class ‘’
–conf ‘’,可以使用“-c”来代替
–deploy-mode 部署模式,有两种模式:client和cluster
–driver-class-path ‘’
–driver-cores ‘’
–driver-java-options ‘’
–driver-library-path ‘’
–driver-memory ‘’
–executor-memory 执行application的内存大小
–jars ‘’
–kill ‘’
–master ‘’ –master yarn-client
–name 提交的application的名字 –name spark_thrift_server_test
–packages ‘’
–exclude-packages ‘’
–properties-file ‘’
–proxy-user 提交任务所用的代理用户 –proxy-user myUser
–py-files ‘’
–repositories ‘’
–status ‘’
–total-executor-cores 执行application的最大executor的数量 –total-executor-cores 20
标识型参数。

这些option只会作为检测,不会取值。
|option的定义|意义|例如|
|-|-|-|
|–help|‘’,可以使用“-h”代替|–|
|–supervise|‘’|–|
|–usage-error|‘’|–|
|–verbose|‘’,可以使用“-v”代替|–|
|–version|‘’|–|

Yarn独享参数
option的定义 意义 例如
–archives ‘’
–executor-cores 执行任务的executor的core的数量 –executor-cores 3
–keytab ‘’
–num-executors 执行任务的executor的个数 –num-executors 10
–principal ‘’
–queue 执行任务的资源队列 –queue test_queue
option的定义和解析
非标识型参数

parse方法,实现了对命令行参数的解析。

1
Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

定义了获取参数的模式,必须以“–”开头,然后是不能为“=”的任意字符,接着是“=”号,最后是任意字符,例如:–name=testName。
但是这也不是必须的格式,在解析参数的代码中还包含了另外一种逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Matcher m = eqSeparatedOpt.matcher(arg);
if (m.matches()) {
arg = m.group(1);
value = m.group(2);
}
// Look for options with a value.
String name = findCliOption(arg, opts);
if (name != null) {
if (value == null) {
if (idx == args.size() - 1) {
throw new IllegalArgumentException(
String.format("Missing argument for option '%s'.", arg));
}
idx++;
value = args.get(idx);
}
if (!handle(name, value)) {
break;
}
continue;
}

可以第一个参数为key,第二个参数为value。
也就是说命令行参数中包含的参数,要么是key=value的格式,要么是key value的模式。也可以是两种格式的组合,但是必须要保证key value的匹配,如果key key=value,那么后面的这个key=value将会作为前面key的value来解析。另外,如果参数的最后一个为key,还会抛出IllegalArgumentException异常。
如下传递是正确的

1
key1 value1 key2=value2 key3 value3

如下传递是错误的

1
2
key1 key2=value2 -- 会把key2=value2当做key1的value
key1=value2 key2 -- 抛出IllegalArgumentException,无法获取key2的value

还有一个需要注意的是,如果提供了不在上面列表中的参数,会抛出UnsupportedOperationException异常,并退出程序的执行。

标识型参数

标志性参数只会检测这个参数是否存在,不会从命令行中读取值。

参数的处理

上面参数解析后,就会得到key和value。然后调用handle方法做进一步处理。SparkSubmitOptionParser要求子类必须实现handle方法,如果子类未实现handle方法,就会调用自身的handle方法,从而抛出UnsupportedOperationException异常。因此,我们看一下子类SplarkSubmitArguments的handle方法。

SparkSubmitArgumnet.handle

方法逻辑很简单,就是检测上面的key是否合法(规定的),如果合法,就设置相应属性的值。如下是合法参数名的值,以及value存储对应SparkSubmitArgumengt中变量,以及value的合法性要求:
|key的定义|value的存储变量|value的合法性要求|
|-|-|-|
|–name|name|-|
|–master|master|-|
|–class|mainClass|-|
|–deploy-mode|deployMode|client或cluster|
|–num-executors|numExecutors|-|
|–total-executor-cores|totalExecutorCores|-|
|–executor-cores|executorCores|-|
|–executor-memory|executorMemory|-|
|–driver-memory|driverMemory|-|
|–driver-cores|driverCores|-|
|–driver-class-path|driverExtraClassPath|-|
|–driver-java-options|driverExtraJavaOptions|-|
|–driver-library-path|driverExtraLibraryPath|-|
|–properties-file|propertiesFile|-|
|–kill|value保存在submissionToKill中,同时设置action = KILL|-|
|–status|value保存在submissionToRequestStatusFor中,同时设置action = REQUEST_STATUS|-|
|–supervise|设置supervise=true|-|
|–queue|queue|-|
|–files|files|-|
|–py-files|pyFiles|-|
|–archives|archives|-|
|–jars|jars|-|
|–packages|packages|-|
|–exclude-packages|packagesExclusions|-|
|–repositories|repositories|-|
|–conf或-c|解析properties文件后保存在sparkProperties中|-|
|–proxy-user|proxyUser|-|
|–principal|principal|-|
|–keytab|keytab|-|
|–help|执行printUsageAndExit(0),打印用法并退出|-|
|–verbose|设置verbose为true|-|
|–version|调用printVersionAndExit()方法,打印版本并退出|-|
|–usage-error|执行printUsageAndExit(1),打印用法并退出|-|

参数的解析流程

parse方法定义在SparkSubmitOptionParser类中,用于解析命令行传递过来的参数。参考上面的“option的定义和解析”部分。

合并默认的Spark属性

合并默认的Spark属性,调用的是mergeDefaultSparkProperties方法。
该方法将会从两个位置读取配置文件:

命令行中-–properties-file参数指定的属性文件。
SPARK_CONF_DIR目录或SPARK_HOME/conf下的spark-default.properties文件

其中优先使用–properties-file参数所指定的文件。
根据这里确定的属性文件,将属性加载到内存中作为默认属性与命令行中使用–conf或–c配置的属性进行合并,优先使用–conf或–c指定的属性。

驳回非Spark属性

什么属于非Spark属性呢?就是那些不以“spark”开头的属性。将上面合并后的属性进行遍历,将不是以“spark”开头的属性,从属性集合中移除。

加载环境参数

加载环境参数的意思就是对于那些通过上面操作,依然没有被设置的属性,从环境配置中再加在一次。基本上的思路就是判断属性是否已经有值,如果没有则从上面的属性中加在一次,如果还没有则再从环境中加在一次。所以参数的优先级如下:命令行中指定 -> 属性中配置(–conf-> –properties-file -> spark_home/conf/spark-default.properties) -> 环境

变量 属性中的配置项 环境中的配置项
master spark.master MASTER
driverExtraClassPath spark.driver.extraClassPath Null
driverExtraJavaOptions spark.driver.extraJavaOptions Null
driverExtraLibraryPath spark.driver.extraLibraryPath Null
driverMemory spark.driver.memory SPARK_DRIVER_MEMORY
driverCores spark.driver.cores Null
executorMemory spark.executor.memory SPARK_EXECUTOR_MEMORY
executorCores spark.executor.cores SPARK_EXECUTOR_CORES
totalExecutorCores spark.cores.max Null
name spark.app.name Null
jars spark.jars Null
files spark.files Null
ivyRepoPath(新增) spark.jars.ivy Null
packages spark.jars.packages Null
packagesExclusions spark.jars.excludes Null
deployMode spark.submit.deployMode DEPLOY_MODE
numExecutors spark.executor.instances Null
queue spark.yarn.queue Null
keytab spark.yarn.keytab Null
principal spark.yarn.principal Null

基本的设置就是以上这些,但是除了这些,还有其他一些逻辑:

主类的确定

当没有通过mainClass指定主类,且不是python或R时,会从jar包中读取主类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (mainClass == null && !isPython && !isR && primaryResource != null) {
val uri = new URI(primaryResource)
val uriScheme = uri.getScheme()
uriScheme match {
case "file" =>
try {
val jar = new JarFile(uri.getPath)
// Note that this might still return null if no main-class is set; we catch that later
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
} catch {
case e: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
}

master的确定

如果没有设置master,则将master设置为local[*]

name的确定

如果master是以“yarn”开头的,则使用当前的name,如果没有设置,则从环境变量SPARK_YARN_APP_NAME中获取;如果环境中也没有,则从主类中获取,如果主类中也没有,则从primaryResource中获取。

1
2
3
4
5
6
7
8
9
if (master.startsWith("yarn")) {
name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
}
// Set name from main class if not given
name = Option(name).orElse(Option(mainClass)).orNull
if (name == null && primaryResource != null) {
name = Utils.stripDirectory(primaryResource)
}

action的确定

如果没有设置action,则将action默认设置为submit。

验证参数

验证参数是要根据action,进行区分验证,不同的action有不同的参数要求。

submit

对于提交参数的验证,要求如下

必须指定主要资源:也就是说jar、python或R不能同时为空。
主类必须通过–class指定或者在Jar中包含。
如果指定了pyFile,则要求主资源必须是Python脚本。
如果master是yarn,则要求环境变量中必须包含HADOOP_CONF_DIR或YARN_CONF_DIR
配置中不能同时存在 proxyUser和principal。

为什么proxyUser和principal不能同时存在呢?因为这是两种不同的认证方式,只能使用一种。对于proxyUser方式,会调用Hadoop的相关API创建代理用户,然后用代理用户执行runMain方法。如果设置了principal,就必须设置–keytab来指定keytab文件,然后会使用keytab信息进行登录。

kill

对于kill参数的验证,要求如下

master必须是以spark://或mesos://开头的,其他的不支持kill
submissionToKill必须指定,也就是必须指定要kill的submission。

status

对于请求状态的验证,要求如下

master必须是以spark://或mesos://开头的,其他的不支持status查询
submissionToRequestStatusFor必须指定,也就是必须说明要查询状态的信息。

至此SparkSubmitArguments对参数的加载(从命令行、配置文件、环境变量)和验证就完成。我们继续回到SparkSubmit中。

Spark Submit

我们已经知道SparkSubmit类支持三种action,现在我们先看看当action为submit时的相关操作。
用来处理action为“submit”的是submit方法,在submit方法中,大体分为三个步骤:提交环境准备、根据代理用户进行操作、根据部署模式进行操作。
对于代理用户的相关操作,就是判断是否指定了代理用户,如果指定了代理用户,则使用代理用户的身份执行runMain,如果没有指定代理用户,则直接执行runMain(相当于使用当前用户)。
对于部署模式的相关操作,基本上都是调用doRunMain方法,只是对于standalone模式下,如果出现异常会做一些其他操作。doRunMain方法,就是根据代理用户进行操作。
所以,这里会将主要任务落到两个方法上:runMain和prepareSubmitEnvironment。

prepareSubmitEnvironment

提交前会进行环境的准备,环境准备通过prepareSubmitEnvironment方法实现。该方法的代码量很大,但是基本上就是验证参数的正确性、参数的合法性、某些未写参数的补充、以及执行类的确定。
这里需要注意下参数的变换,我们上面已经知道配置参数可以通过命令行、命令行的配置文件、默认配置文件和环境变量中得到。这里再生成下一个主类使用的参数时,参数只会包含三个类型:–class、–jar、–arg(对于python会包含–primary-py-file,对于R会包含–primary-r-file)。所以那些属性会作为–arg进行提供。

runMain

runMain就是使用prepareSubmitEnvironment确定的环境变量和属性来执行prepareSubmitEnvironment中确定的主类,直接执行主类的main方法。

例如,对于Yarn集群模式,执行的就是org.apache.spark.deploy.yarn.Client。

Client

Client.submitApplication

org.apache.spark.deploy.yarn.Client类中submitApplication方法实现了application的提交逻辑,基本流程如图:
image.png

创建证书

创建证书是通过setupCredentials方法实现的,其定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def setupCredentials(): Unit = {
// 判断是否是使用Kerberos进行登录,如果配置中设置了principal就表示使用kerberos,而且要求配置了Keytab信息
loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
if (loginFromKeytab) {
principal = sparkConf.get(PRINCIPAL).get
keytab = sparkConf.get(KEYTAB).orNull
require(keytab != null, "Keytab must be specified when principal is specified.")
// 加载Keytab文件,并对文件名
val f = new File(keytab)
amKeytabFileName = f.getName + "-" + UUID.randomUUID().toString
sparkConf.set(PRINCIPAL.key, principal)
}
// 创建当前用户的证书的拷贝
credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials)
}

创建yarnClient并创建Application

submitApplication方法在设置完证书后,就会使用yarnClient来创建Application,并得到Applcation的相关信息(包括application id、application submission context等)以供后面使用,其代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
// 初始化yarnClient,并启动
yarnClient.init(yarnConf)
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// 并得到新application的响应信息,并从响应信息中得到application的id
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()

资源验证

资源验证的目的就是检查Yarn集群中是否有足够的内存来运行Application Master,该功能通过verifyClusterResources方法来实现,AM所需的内存资源为内存和负载内存之和。
对于内存和负载内存的配置值,会根据集群模式的不同而取值不同。
对于集群模式,会从spark.driver.memory和spark.yarn.driver.memoryOverhead配置中读取,对于非集群模式,则会从spark.yarn.am.memory和spark.yarn.am.memoryOverhead中读取。
如果没有设置负载内存,负载内存还有一个推算公式:max((0.10 * 内存), 384L)

创建Container启动上下文

创建Container启动上下文是通过createContainerLaunchContext方法实现。对于这个方法,其功能大致分为三个部分:启动环境准备、资源准备和启动命令的拼接。

启动环境准备

启动环境的准备是通过setupLaunchEnv方法实现的。 – 以后补充

资源准备

资源的准备是通过prepareLocalResources方法实现的。下面将详细介绍这个方法。

证书的管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 从证书管理器那里获得证书下一次更新的时间
val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials)
// 如果已经有证书了,则将证书设置给当前用户
if (credentials != null) {
UserGroupInformation.getCurrentUser.addCredentials(credentials)
logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
}
// 如果我们使用principal和keytab登录,那么证书需要在之后的时间被重新构建,我们应当将下一次的重构和更新时间传递给构建者和更新者
if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() &&
nearestTimeOfNextRenewal != Long.MaxValue) {
val currTime = System.currentTimeMillis()
val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime
val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime
sparkConf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong)
sparkConf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong)
}

证书管理的功能就是:如果证书存在,将证书添加到当前用户中;根据时间设置证书的重新生成时间和更新时间。

资源添加和验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 用于保存添加到分发缓存的URI列表,如果同一个URI被添加多次,YARN将以内部错误使container启动失败
val distributedUris = new HashSet[String]
// 用于保存添加到分发缓存的URI是否有相同的名字,如果有相同的名字的被提交多次,但是文件路径不同,Yarn将以内部错误是container启动失败
val distributedNames = new HashSet[String]
val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
.getOrElse(fs.getDefaultReplication(destDir))
// 用来保存本地资源的集合
val localResources = HashMap[String, LocalResource]()
// 在HDFS上创建 staging目录,并将目录的访问权限设置为700
FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
// 保存文件的文件状态,不是以文件为key而是一个文件的URI作为key
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
val symlinkCache: Map[URI, Path] = HashMap[URI, Path]()
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
// 获取要分发的文件名
val fileName = new File(uri.getPath).getName
// 如果URI已经被添加多次,添加失败,如果文件名也被添加多次,添加失败,否则添加成功
if (distributedUris.contains(uriStr)) {
logWarning(s"Same path resource $uri added multiple times to distributed cache.")
false
} else if (distributedNames.contains(fileName)) {
logWarning(s"Same name resource $uri added multiple times to distributed cache")
false
} else {
distributedUris += uriStr
distributedNames += fileName
true
}
}

distributedUris和distributedNames用来进行资源添加的验证(在addDistributedUri中使用),在添加资源的时候,资源是以URI的方式来添加,对于同一个URI只允许添加一次,并且如果URI不同,但是文件名相同的情况,也会验证,同一个文件名的文件也只允许添加一次。另外代码中还设置了资源文件的存放位置((spark.yarn.stagingDir|~)/.sparkStaging/application_id)、资源文件的副本数(3)以及资源文件目录的权限(700)。addDistributedUri方法用来进行验证。

资源分发操作

distribute方法是资源分发的实现。具体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def distribute(
path: String,
resType: LocalResourceType = LocalResourceType.FILE,
destName: Option[String] = None,
targetDir: Option[String] = None,
appMasterOnly: Boolean = false): (Boolean, String) = {
val trimmedPath = path.trim()
val localURI = Utils.resolveURI(trimmedPath)
// 判断URI是否是以local开头的
if (localURI.getScheme != LOCAL_SCHEME) {
// 将URI添加到分发缓存,同时会对URI和FileName进行验证,不允许重复添加
if (addDistributedUri(localURI)) {
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
// 拼接在混存中保存的名字
val linkname = targetDir.map(_ + "/").getOrElse("") +
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
// 保存文件的缓存子目录
val destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache)
// 目标文件系统,HDFS
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
// 将文件添加到缓存
// destFs 是HDFS文件系统
distCacheMgr.addResource(
destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
appMasterOnly = appMasterOnly)
(false, linkname)
} else {
(false, null)
}
} else {
(true, trimmedPath)
}
}

这个方法的作用就是将不是以“local”开头的文件添加到分发缓存中(调用addDistributedUri方法),添加的时候会进行验证。copyFileToRemote方法的作用,就是将本地的文件(file://)上传到HDFS上,copyFileToRemote方法中会进行两个文件系统的对比,只有当源文件系统和目标文件系统不同(不同的HDFS或一个是HDFS一个是普通文件系统)才会进行复制文件。最后调用ClientDistributedCacheManager的addResource方法将文件加入到Resource列表中供后面启动container使用。

分发Keytab文件
1
2
3
4
5
6
7
8
if (loginFromKeytab) {
logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
" via the YARN Secure Distributed Cache.")
val (_, localizedPath) = distribute(keytab,
destName = Some(amKeytabFileName),
appMasterOnly = true)
require(localizedPath != null, "Keytab file already distributed.")
}

amKeytabFileName是添加了UUID后缀的名字,作用就是调用distribute方法,将keytab文件分发,但是只是分发给Application Master。

jar文件的分发逻辑

在对Jar进行分发的时候,会有三种情况:

设置了spark.yarn.archive
设置了spark.yarn.jars
spark.yarn.archeive和spark.yarn.jars都没有设设置

当同时设置了spark.yarn.archive和spark.yarn.jars时,spark.yarn.archive优先级高。

spark.yarn.archive

如果配置了spark.yarn.archive(目录,且要求必须不是以local开头的文件系统),则将spark.yarn.archive配置的目录作为ARCHIVE类型的资源分发到“spark_libs”目录中。

spark.yarn.jars

如果配置了spark.yarn.jars(必须是文件,多个文件用逗号分隔),则将spark.yarn.jars中的每个文件作为FILE类型资源分发到“spark_libs”目录中。
对于local文件(以local://)开头的文件,会将其重新设置到sparkConf中的“spark.yarn.jars”配置项中。

上传${SPARK_HOME}下的jars

对于既没有配置spark.yarn.archive又没有配置spark.yarn.jars,那么系统会将环境变量${SPARK_HOME}中指定的目录下的jars或assembly/target/scala-%{scala_version}}/jars目录中的jar打包为一个“spark_libs.zip”文件,然后将这个文件作为ARCHIVE类型分发到“spark_libs”目录。

看了三种情况,这里有一个问题,对于配置了spark,yarn.archive和什么都没有配置的情况,都是将URI作为ARCHIVE类型资源分发到“spark_libs”目录中,那么这两个参数还有什么作用么?其实最主要的区别就是在进行文件拷本的时候,也就是调用copyFileToRemote方法的时候,可以减少上传操作。

如下是jar文件分发的逻辑实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
// 如果定义了 spark.yarn.archive
if (sparkArchive.isDefined) {
val archive = sparkArchive.get
// 要求 spark.yarn.archive 的value不是是本地目录(local://开头)
require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
distribute(Utils.resolveURI(archive).toString,
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_LIB_DIR))
} else {
sparkConf.get(SPARK_JARS) match {
case Some(jars) =>
// Break the list of jars to upload, and resolve globs.
// 将spark.yarn.jars中配置的jar(非本地jar)进行分发
val localJars = new ArrayBuffer[String]()
jars.foreach { jar =>
if (!isLocalUri(jar)) {
// 得到jar的路径
val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
// 得到文件系统
val pathFs = FileSystem.get(path.toUri(), hadoopConf)
pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
val uri = entry.getPath().toUri()
statCache.update(uri, entry)
// 分发jar
distribute(uri.toString(), targetDir = Some(LOCALIZED_LIB_DIR))
}
} else {
localJars += jar
}
}
sparkConf.set(SPARK_JARS, localJars)
case None =>
// No configuration, so fall back to uploading local jar files.
// 没有配置 spark.yarn.archive和spark.yarn.jars 需要上传本地的jar包
logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " +
"to uploading libraries under SPARK_HOME.")
// 在Spark home目录下查找存放jar的目录
val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
sparkConf.getenv("SPARK_HOME")))
// 创建一个名为 __spark_libs__.zip的
val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
new File(Utils.getLocalDir(sparkConf)))
// 利用 __spark_libs__.zip创建输出流
val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))
// 将jar目录下的所有jar文件,添加到__spark_libs__.zip的输出流中,这里其实就是将 jar打包成一个 __spark_libs__.zip的文件
try {
jarsStream.setLevel(0)
jarsDir.listFiles().foreach { f =>
if (f.isFile && f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
jarsStream.putNextEntry(new ZipEntry(f.getName))
Files.copy(f, jarsStream)
jarsStream.closeEntry()
}
}
} finally {
jarsStream.close()
}
// 将打包好的__spark_libs__.zip进行分发, 分发到__spark_libs__目录下
distribute(jarsArchive.toURI.getPath,
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_LIB_DIR))
jarsArchive.delete()
}
}

看到代码的实现,这里还有一个问题,对于配置spark.yarn.jars的情况,如果其中的jar是以“local://”开头的,会将这些文件加入到列表中,然后用这个列表来重新设置spark.yarn.jars的配置,那么后续还会怎么处理呢?也就是说对于以local://开头配置的文件,要怎么处理呢?

分发用户Jar包

对于用户的jar包(通过–jar参数指定的),会将其以“app.jar”进行分发

1
2
3
4
5
6
7
8
9
Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar =>
val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME))
if (isLocal) {
require(localizedPath != null, s"Path $jar already distributed")
// If the resource is intended for local use only, handle this downstream
// by setting the appropriate property
sparkConf.set(APP_JAR, localizedPath)
}
}

其他需要分发的

除了上面那些分发的内容,用户还可以设置spark.yarn.dist.jars、spark.yarn.dist.files或spark.yarn.dist.archives配置项来制定自己要分发的文件,对于spark.yarn.dist.jars指定的jar,会被添加到classPath中,另外两个不会。需要注意的是这些文件不能和之前上传的URI或文件名相同,否则不会分发。然后这些值会作为配置项“spark.yarn.secondary.jars”的信息进行设置。对于那些AM不需要,但是executor需要的jar,可以通过这种方式来配置。

配置项的上传

除了上面的jar包、文件的分发,系统还会上传,配置项会被上传到(spark.yarn.stagingDir|~)/.sparkStaging/application_id/spark_conf.zip位置。
上传的内容配置内容如下,“spark.overlay.hadoop.conf.filenames”配置项中指定的配置文件、log4j.properties、metrics.properties、环境变量“HADOOP_CONF_DIR”目录中的文件、环境变量“YARN_CONF_DIR”目录中的文件。其中如果有相同的配置文件,那么最后两个环境变量中的文件优先级最低。除了这些文件,系统还会将内存中sparkConf的属性和Keytab文件中的属性,以“spark_conf.properties”进行保存,一起添加到压缩文件spark_conf.zip中,一起上传到spark.yarn.stagingDir|~)/.sparkStaging/application_id/spark_conf.zip位置。

这些资源如何传递给container呢?答案是生成ContainerLaunchContext时(Records.newRecord(classOf[ContainerLaunchContext])–这样创建),作为localResources属性设置的。除了localResource,还有一个environment需要设置,环境信息和资源信息一样,也会上传到“(spark.yarn.stagingDir|~)/.sparkStaging/application_id/”目录下。

启动命令的拼接

启动命令的拼接就是为了要拼成 /bin/java -server ${-javaOpts} ${amArg}这样的命令来启动另外一个java类。对于要启动哪个类,如下判断:

对于集群模式将会启动org.apache.spark.deploy.yarn.ApplicationMaster对象,赋予非集群模式将启动org.apache.spark.deploy.yarn.ExecutorLauncher对象。

对于集群模式,启动ApplicationManster,这样就与之前ApplicationMaster的运行流程连接起来。

创建Application提交上下文

创建Container提交上下文是通过createApplicationSubmissionContext方法实现的。与其说是创建不如说设置,因为这个上下文是通过newApp.getApplicationSubmissionContext得到的,而newApp是通过yarn客户端调用createApplication得到的。
这个方法对Context设置的信息如下

context属性 取值
applicationName spark.app.name配置项,默认为Spark
queue spark.yarn.queue配置项
AMContainerSpec 上面生成的ContainerLaunchContext
applcationType “SPARK”
applicationTags spark.yarn.tags配置项,如果是多值,逐个设置
maxAppAttempts spark.yarn.maxAppAttempts配置项
attemptFailuresValidityInterval spark.yarn.am.attemptFailuresValidityInterval配置项
AMContainerResourceRequest 新生成的ResourceRequest对象,ResourceRequest对象包含的信息,如下表,该配置只有在spark.yarn.am.nodeLabelExpression设置时有效
resource 新生成的Resource对象,包括所需的内存和CPU,该配置只有在spark.yarn.am.nodeLabelExpression没有设置时有效
logAggregationContext 新生成的LogAggregationContext,只有在spark.yarn.rolledLog.includePattern设置时有效

ResourceRequest包含的信息如下
|属性|取值|
|-|-|
|resourceName|*,等|
|priority|Priority对象,默认为0|
|capability|Resource对象,包含内存和cpu|
|numContainers|所需container的数量|
|nodeLableExpression|节点标签表达式 —- 这个在申请资源时有什么作用|

将上面补充好的ApplicationSubmissionContext对象作为参数,通过yarn客户端的submitApplication方法,就完成了application的提交。

得到Application提交上下文之后,便可以调用yarnClient来提交Application。