本文用于整理在使用spark-submit提交任务的流程
spark-submit脚本的定义
使用spark-submit提交任务的时候,实际上调用的是${SPARK_HOME}/bin/spark-submit来提交的。例如:
如下是${SPARK_HOME}/bin/spark-submit脚本的定义:
从代码可以看出,实际上执行的是 org.apache.spark.deploy.SparkSubmit类。因此我们具体看看这个类的实现。
SparkSubmit的启动流程
如下是SparkSubmit主函数的定义:
|
|
进入主函数后,会使用SparkSubmitArguments类对SparkSubmit的命令行参数进行解析。然后根据参数中的action信息进行具体的操作。
由此也可以看出,SparkSubmit支持三种action:–submit、–kill和–status。
SparkSubmitArguments
SparkSubmitArgument的继承关系
其中SparkSubmitArgumentsParser是没有具体实现,SparkSubmitOptionParser主要用来解析option。
SparkSubmitArguments实例化执行
在SparkSubmit中会利用main的参数生成一个SparkSubmitArguments的,生成SparkSubmitArguments对象的时候就会执行如下代码:
首先调用的是parse方法,而SparkSubmitArguments本身是没有这个方法的,但是它会从父类SparkSubmitOptionParser那里继承,因此调用的父类的parse方法。
SparkSubmitOptionParser
SparkSbumitOptionParser用来解析命令行提供的参数,从SparkSubmitOptionParser中我们可以看出,可以使用的参数列表如下:
标准参数
option的定义 | 意义 | 例如 |
---|---|---|
–class | ‘’ | – |
–conf | ‘’,可以使用“-c”来代替 | – |
–deploy-mode | 部署模式,有两种模式:client和cluster | – |
–driver-class-path | ‘’ | – |
–driver-cores | ‘’ | – |
–driver-java-options | ‘’ | – |
–driver-library-path | ‘’ | – |
–driver-memory | ‘’ | – |
–executor-memory | 执行application的内存大小 | – |
–jars | ‘’ | – |
–kill | ‘’ | – |
–master | ‘’ | –master yarn-client |
–name | 提交的application的名字 | –name spark_thrift_server_test |
–packages | ‘’ | – |
–exclude-packages | ‘’ | – |
–properties-file | ‘’ | – |
–proxy-user | 提交任务所用的代理用户 | –proxy-user myUser |
–py-files | ‘’ | – |
–repositories | ‘’ | – |
–status | ‘’ | – |
–total-executor-cores | 执行application的最大executor的数量 | –total-executor-cores 20 |
标识型参数。
这些option只会作为检测,不会取值。
|option的定义|意义|例如|
|-|-|-|
|–help|‘’,可以使用“-h”代替|–|
|–supervise|‘’|–|
|–usage-error|‘’|–|
|–verbose|‘’,可以使用“-v”代替|–|
|–version|‘’|–|
Yarn独享参数
option的定义 | 意义 | 例如 |
---|---|---|
–archives | ‘’ | – |
–executor-cores | 执行任务的executor的core的数量 | –executor-cores 3 |
–keytab | ‘’ | – |
–num-executors | 执行任务的executor的个数 | –num-executors 10 |
–principal | ‘’ | – |
–queue | 执行任务的资源队列 | –queue test_queue |
option的定义和解析
非标识型参数
parse方法,实现了对命令行参数的解析。
定义了获取参数的模式,必须以“–”开头,然后是不能为“=”的任意字符,接着是“=”号,最后是任意字符,例如:–name=testName。
但是这也不是必须的格式,在解析参数的代码中还包含了另外一种逻辑:
可以第一个参数为key,第二个参数为value。
也就是说命令行参数中包含的参数,要么是key=value的格式,要么是key value的模式。也可以是两种格式的组合,但是必须要保证key value的匹配,如果key key=value,那么后面的这个key=value将会作为前面key的value来解析。另外,如果参数的最后一个为key,还会抛出IllegalArgumentException异常。
如下传递是正确的
如下传递是错误的
还有一个需要注意的是,如果提供了不在上面列表中的参数,会抛出UnsupportedOperationException异常,并退出程序的执行。
标识型参数
标志性参数只会检测这个参数是否存在,不会从命令行中读取值。
参数的处理
上面参数解析后,就会得到key和value。然后调用handle方法做进一步处理。SparkSubmitOptionParser要求子类必须实现handle方法,如果子类未实现handle方法,就会调用自身的handle方法,从而抛出UnsupportedOperationException异常。因此,我们看一下子类SplarkSubmitArguments的handle方法。
SparkSubmitArgumnet.handle
方法逻辑很简单,就是检测上面的key是否合法(规定的),如果合法,就设置相应属性的值。如下是合法参数名的值,以及value存储对应SparkSubmitArgumengt中变量,以及value的合法性要求:
|key的定义|value的存储变量|value的合法性要求|
|-|-|-|
|–name|name|-|
|–master|master|-|
|–class|mainClass|-|
|–deploy-mode|deployMode|client或cluster|
|–num-executors|numExecutors|-|
|–total-executor-cores|totalExecutorCores|-|
|–executor-cores|executorCores|-|
|–executor-memory|executorMemory|-|
|–driver-memory|driverMemory|-|
|–driver-cores|driverCores|-|
|–driver-class-path|driverExtraClassPath|-|
|–driver-java-options|driverExtraJavaOptions|-|
|–driver-library-path|driverExtraLibraryPath|-|
|–properties-file|propertiesFile|-|
|–kill|value保存在submissionToKill中,同时设置action = KILL|-|
|–status|value保存在submissionToRequestStatusFor中,同时设置action = REQUEST_STATUS|-|
|–supervise|设置supervise=true|-|
|–queue|queue|-|
|–files|files|-|
|–py-files|pyFiles|-|
|–archives|archives|-|
|–jars|jars|-|
|–packages|packages|-|
|–exclude-packages|packagesExclusions|-|
|–repositories|repositories|-|
|–conf或-c|解析properties文件后保存在sparkProperties中|-|
|–proxy-user|proxyUser|-|
|–principal|principal|-|
|–keytab|keytab|-|
|–help|执行printUsageAndExit(0),打印用法并退出|-|
|–verbose|设置verbose为true|-|
|–version|调用printVersionAndExit()方法,打印版本并退出|-|
|–usage-error|执行printUsageAndExit(1),打印用法并退出|-|
参数的解析流程
parse方法定义在SparkSubmitOptionParser类中,用于解析命令行传递过来的参数。参考上面的“option的定义和解析”部分。
合并默认的Spark属性
合并默认的Spark属性,调用的是mergeDefaultSparkProperties方法。
该方法将会从两个位置读取配置文件:
命令行中-–properties-file参数指定的属性文件。
SPARK_CONF_DIR目录或SPARK_HOME/conf下的spark-default.properties文件
其中优先使用–properties-file参数所指定的文件。
根据这里确定的属性文件,将属性加载到内存中作为默认属性与命令行中使用–conf或–c配置的属性进行合并,优先使用–conf或–c指定的属性。
驳回非Spark属性
什么属于非Spark属性呢?就是那些不以“spark”开头的属性。将上面合并后的属性进行遍历,将不是以“spark”开头的属性,从属性集合中移除。
加载环境参数
加载环境参数的意思就是对于那些通过上面操作,依然没有被设置的属性,从环境配置中再加在一次。基本上的思路就是判断属性是否已经有值,如果没有则从上面的属性中加在一次,如果还没有则再从环境中加在一次。所以参数的优先级如下:命令行中指定 -> 属性中配置(–conf-> –properties-file -> spark_home/conf/spark-default.properties) -> 环境
变量 | 属性中的配置项 | 环境中的配置项 |
---|---|---|
master | spark.master | MASTER |
driverExtraClassPath | spark.driver.extraClassPath | Null |
driverExtraJavaOptions | spark.driver.extraJavaOptions | Null |
driverExtraLibraryPath | spark.driver.extraLibraryPath | Null |
driverMemory | spark.driver.memory | SPARK_DRIVER_MEMORY |
driverCores | spark.driver.cores | Null |
executorMemory | spark.executor.memory | SPARK_EXECUTOR_MEMORY |
executorCores | spark.executor.cores | SPARK_EXECUTOR_CORES |
totalExecutorCores | spark.cores.max | Null |
name | spark.app.name | Null |
jars | spark.jars | Null |
files | spark.files | Null |
ivyRepoPath(新增) | spark.jars.ivy | Null |
packages | spark.jars.packages | Null |
packagesExclusions | spark.jars.excludes | Null |
deployMode | spark.submit.deployMode | DEPLOY_MODE |
numExecutors | spark.executor.instances | Null |
queue | spark.yarn.queue | Null |
keytab | spark.yarn.keytab | Null |
principal | spark.yarn.principal | Null |
基本的设置就是以上这些,但是除了这些,还有其他一些逻辑:
主类的确定
当没有通过mainClass指定主类,且不是python或R时,会从jar包中读取主类:
master的确定
如果没有设置master,则将master设置为local[*]
name的确定
如果master是以“yarn”开头的,则使用当前的name,如果没有设置,则从环境变量SPARK_YARN_APP_NAME中获取;如果环境中也没有,则从主类中获取,如果主类中也没有,则从primaryResource中获取。
action的确定
如果没有设置action,则将action默认设置为submit。
验证参数
验证参数是要根据action,进行区分验证,不同的action有不同的参数要求。
submit
对于提交参数的验证,要求如下
必须指定主要资源:也就是说jar、python或R不能同时为空。
主类必须通过–class指定或者在Jar中包含。
如果指定了pyFile,则要求主资源必须是Python脚本。
如果master是yarn,则要求环境变量中必须包含HADOOP_CONF_DIR或YARN_CONF_DIR
配置中不能同时存在 proxyUser和principal。
为什么proxyUser和principal不能同时存在呢?因为这是两种不同的认证方式,只能使用一种。对于proxyUser方式,会调用Hadoop的相关API创建代理用户,然后用代理用户执行runMain方法。如果设置了principal,就必须设置–keytab来指定keytab文件,然后会使用keytab信息进行登录。
kill
对于kill参数的验证,要求如下
master必须是以spark://或mesos://开头的,其他的不支持kill
submissionToKill必须指定,也就是必须指定要kill的submission。
status
对于请求状态的验证,要求如下
master必须是以spark://或mesos://开头的,其他的不支持status查询
submissionToRequestStatusFor必须指定,也就是必须说明要查询状态的信息。
至此SparkSubmitArguments对参数的加载(从命令行、配置文件、环境变量)和验证就完成。我们继续回到SparkSubmit中。
Spark Submit
我们已经知道SparkSubmit类支持三种action,现在我们先看看当action为submit时的相关操作。
用来处理action为“submit”的是submit方法,在submit方法中,大体分为三个步骤:提交环境准备、根据代理用户进行操作、根据部署模式进行操作。
对于代理用户的相关操作,就是判断是否指定了代理用户,如果指定了代理用户,则使用代理用户的身份执行runMain,如果没有指定代理用户,则直接执行runMain(相当于使用当前用户)。
对于部署模式的相关操作,基本上都是调用doRunMain方法,只是对于standalone模式下,如果出现异常会做一些其他操作。doRunMain方法,就是根据代理用户进行操作。
所以,这里会将主要任务落到两个方法上:runMain和prepareSubmitEnvironment。
prepareSubmitEnvironment
提交前会进行环境的准备,环境准备通过prepareSubmitEnvironment方法实现。该方法的代码量很大,但是基本上就是验证参数的正确性、参数的合法性、某些未写参数的补充、以及执行类的确定。
这里需要注意下参数的变换,我们上面已经知道配置参数可以通过命令行、命令行的配置文件、默认配置文件和环境变量中得到。这里再生成下一个主类使用的参数时,参数只会包含三个类型:–class、–jar、–arg(对于python会包含–primary-py-file,对于R会包含–primary-r-file)。所以那些属性会作为–arg进行提供。
runMain
runMain就是使用prepareSubmitEnvironment确定的环境变量和属性来执行prepareSubmitEnvironment中确定的主类,直接执行主类的main方法。
例如,对于Yarn集群模式,执行的就是org.apache.spark.deploy.yarn.Client。
Client
Client.submitApplication
org.apache.spark.deploy.yarn.Client类中submitApplication方法实现了application的提交逻辑,基本流程如图:
创建证书
创建证书是通过setupCredentials方法实现的,其定义如下
创建yarnClient并创建Application
submitApplication方法在设置完证书后,就会使用yarnClient来创建Application,并得到Applcation的相关信息(包括application id、application submission context等)以供后面使用,其代码实现如下:
资源验证
资源验证的目的就是检查Yarn集群中是否有足够的内存来运行Application Master,该功能通过verifyClusterResources方法来实现,AM所需的内存资源为内存和负载内存之和。
对于内存和负载内存的配置值,会根据集群模式的不同而取值不同。
对于集群模式,会从spark.driver.memory和spark.yarn.driver.memoryOverhead配置中读取,对于非集群模式,则会从spark.yarn.am.memory和spark.yarn.am.memoryOverhead中读取。
如果没有设置负载内存,负载内存还有一个推算公式:max((0.10 * 内存), 384L)
创建Container启动上下文
创建Container启动上下文是通过createContainerLaunchContext方法实现。对于这个方法,其功能大致分为三个部分:启动环境准备、资源准备和启动命令的拼接。
启动环境准备
启动环境的准备是通过setupLaunchEnv方法实现的。 – 以后补充
资源准备
资源的准备是通过prepareLocalResources方法实现的。下面将详细介绍这个方法。
证书的管理
|
|
证书管理的功能就是:如果证书存在,将证书添加到当前用户中;根据时间设置证书的重新生成时间和更新时间。
资源添加和验证
|
|
distributedUris和distributedNames用来进行资源添加的验证(在addDistributedUri中使用),在添加资源的时候,资源是以URI的方式来添加,对于同一个URI只允许添加一次,并且如果URI不同,但是文件名相同的情况,也会验证,同一个文件名的文件也只允许添加一次。另外代码中还设置了资源文件的存放位置((spark.yarn.stagingDir|~)/.sparkStaging/application_id)、资源文件的副本数(3)以及资源文件目录的权限(700)。addDistributedUri方法用来进行验证。
资源分发操作
distribute方法是资源分发的实现。具体定义如下:
这个方法的作用就是将不是以“local”开头的文件添加到分发缓存中(调用addDistributedUri方法),添加的时候会进行验证。copyFileToRemote方法的作用,就是将本地的文件(file://)上传到HDFS上,copyFileToRemote方法中会进行两个文件系统的对比,只有当源文件系统和目标文件系统不同(不同的HDFS或一个是HDFS一个是普通文件系统)才会进行复制文件。最后调用ClientDistributedCacheManager的addResource方法将文件加入到Resource列表中供后面启动container使用。
分发Keytab文件
|
|
amKeytabFileName是添加了UUID后缀的名字,作用就是调用distribute方法,将keytab文件分发,但是只是分发给Application Master。
jar文件的分发逻辑
在对Jar进行分发的时候,会有三种情况:
设置了spark.yarn.archive
设置了spark.yarn.jars
spark.yarn.archeive和spark.yarn.jars都没有设设置
当同时设置了spark.yarn.archive和spark.yarn.jars时,spark.yarn.archive优先级高。
spark.yarn.archive
如果配置了spark.yarn.archive(目录,且要求必须不是以local开头的文件系统),则将spark.yarn.archive配置的目录作为ARCHIVE类型的资源分发到“spark_libs”目录中。
spark.yarn.jars
如果配置了spark.yarn.jars(必须是文件,多个文件用逗号分隔),则将spark.yarn.jars中的每个文件作为FILE类型资源分发到“spark_libs”目录中。
对于local文件(以local://)开头的文件,会将其重新设置到sparkConf中的“spark.yarn.jars”配置项中。
上传${SPARK_HOME}下的jars
对于既没有配置spark.yarn.archive又没有配置spark.yarn.jars,那么系统会将环境变量${SPARK_HOME}中指定的目录下的jars或assembly/target/scala-%{scala_version}}/jars目录中的jar打包为一个“spark_libs.zip”文件,然后将这个文件作为ARCHIVE类型分发到“spark_libs”目录。
看了三种情况,这里有一个问题,对于配置了spark,yarn.archive和什么都没有配置的情况,都是将URI作为ARCHIVE类型资源分发到“spark_libs”目录中,那么这两个参数还有什么作用么?其实最主要的区别就是在进行文件拷本的时候,也就是调用copyFileToRemote方法的时候,可以减少上传操作。
如下是jar文件分发的逻辑实现:
看到代码的实现,这里还有一个问题,对于配置spark.yarn.jars的情况,如果其中的jar是以“local://”开头的,会将这些文件加入到列表中,然后用这个列表来重新设置spark.yarn.jars的配置,那么后续还会怎么处理呢?也就是说对于以local://开头配置的文件,要怎么处理呢?
分发用户Jar包
对于用户的jar包(通过–jar参数指定的),会将其以“app.jar”进行分发
其他需要分发的
除了上面那些分发的内容,用户还可以设置spark.yarn.dist.jars、spark.yarn.dist.files或spark.yarn.dist.archives配置项来制定自己要分发的文件,对于spark.yarn.dist.jars指定的jar,会被添加到classPath中,另外两个不会。需要注意的是这些文件不能和之前上传的URI或文件名相同,否则不会分发。然后这些值会作为配置项“spark.yarn.secondary.jars”的信息进行设置。对于那些AM不需要,但是executor需要的jar,可以通过这种方式来配置。
配置项的上传
除了上面的jar包、文件的分发,系统还会上传,配置项会被上传到(spark.yarn.stagingDir|~)/.sparkStaging/application_id/spark_conf.zip位置。
上传的内容配置内容如下,“spark.overlay.hadoop.conf.filenames”配置项中指定的配置文件、log4j.properties、metrics.properties、环境变量“HADOOP_CONF_DIR”目录中的文件、环境变量“YARN_CONF_DIR”目录中的文件。其中如果有相同的配置文件,那么最后两个环境变量中的文件优先级最低。除了这些文件,系统还会将内存中sparkConf的属性和Keytab文件中的属性,以“spark_conf.properties”进行保存,一起添加到压缩文件spark_conf.zip中,一起上传到spark.yarn.stagingDir|~)/.sparkStaging/application_id/spark_conf.zip位置。
这些资源如何传递给container呢?答案是生成ContainerLaunchContext时(Records.newRecord(classOf[ContainerLaunchContext])–这样创建),作为localResources属性设置的。除了localResource,还有一个environment需要设置,环境信息和资源信息一样,也会上传到“(spark.yarn.stagingDir|~)/.sparkStaging/application_id/”目录下。
启动命令的拼接
启动命令的拼接就是为了要拼成 /bin/java -server ${-javaOpts} ${amArg}这样的命令来启动另外一个java类。对于要启动哪个类,如下判断:
对于集群模式将会启动org.apache.spark.deploy.yarn.ApplicationMaster对象,赋予非集群模式将启动org.apache.spark.deploy.yarn.ExecutorLauncher对象。
对于集群模式,启动ApplicationManster,这样就与之前ApplicationMaster的运行流程连接起来。
创建Application提交上下文
创建Container提交上下文是通过createApplicationSubmissionContext方法实现的。与其说是创建不如说设置,因为这个上下文是通过newApp.getApplicationSubmissionContext得到的,而newApp是通过yarn客户端调用createApplication得到的。
这个方法对Context设置的信息如下
context属性 | 取值 |
---|---|
applicationName | spark.app.name配置项,默认为Spark |
queue | spark.yarn.queue配置项 |
AMContainerSpec | 上面生成的ContainerLaunchContext |
applcationType | “SPARK” |
applicationTags | spark.yarn.tags配置项,如果是多值,逐个设置 |
maxAppAttempts | spark.yarn.maxAppAttempts配置项 |
attemptFailuresValidityInterval | spark.yarn.am.attemptFailuresValidityInterval配置项 |
AMContainerResourceRequest | 新生成的ResourceRequest对象,ResourceRequest对象包含的信息,如下表,该配置只有在spark.yarn.am.nodeLabelExpression设置时有效 |
resource | 新生成的Resource对象,包括所需的内存和CPU,该配置只有在spark.yarn.am.nodeLabelExpression没有设置时有效 |
logAggregationContext | 新生成的LogAggregationContext,只有在spark.yarn.rolledLog.includePattern设置时有效 |
ResourceRequest包含的信息如下
|属性|取值|
|-|-|
|resourceName|*,等|
|priority|Priority对象,默认为0|
|capability|Resource对象,包含内存和cpu|
|numContainers|所需container的数量|
|nodeLableExpression|节点标签表达式 —- 这个在申请资源时有什么作用|
将上面补充好的ApplicationSubmissionContext对象作为参数,通过yarn客户端的submitApplication方法,就完成了application的提交。
得到Application提交上下文之后,便可以调用yarnClient来提交Application。