Submitting Applications

本文是对spark文档之Submitting Applications章节的翻译,原文档连接
在Spark的bin目录下的spark-submit脚本被用来发布应用程序到集群中。它能够使用所有被Spark支持的cluster managers的统一接口,因此你不需要为每个application进行配置。

Bundling Your Application’s Dependencies(绑定你的应用程序的依赖)

如果你的代码依赖其他项目,你需要将它们打包到你的应用程序中,以便将它分发到集群。为了这样做,需要创建一个assembly jar(或)来包含你的代码和代码的依赖。sbt和Maven都有assembly插件。当创建assembly jar的时候,作为被提供的依赖列出Spark和Hadoop;这些不需要被捆绑,因为他们在运行时,cluster管理器会提供。一旦你有了一个assembly的jar包,你能够调用bin/spark-submit脚本在传递你的jar时进行解析。
对于Python,你可以使用spark-submit的–py-files参数来添加.py,.zip或.egg的文件,来发布你的application。如果你依赖多个Python文件,我们推荐将它们打包到.zip或.egg中。

Launching Applications with spark-submit

一旦一个用户application被捆绑,那么这个application可以使用bin/spark-submit脚本来发布。这个脚本需要使用Spark和它的依赖来设置classpath,并支持不同的cluster manager和Spark支持的部署模式:

1
2
3
4
5
6
7
./bin/spark-submit \
--class <main-class> --master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... #other options
<application-jar> \
[application-arguments]

一些常用的选项:

1、–class:你的应用程序的入口点(例如:org.apache.spark.examples.SparkPi)。
2、–master:集群的master URL(例如:spark://23.195.26.187:7077)。
3、–deploy-mode:部署你的程序到worker节点(cluster),还是作为一个外部client(client),默认是client。
4、–conf:以key=value的格式指定任意的Spark配置。对于包含空格的值,要包含在引号中”key=value”。
5、application-jar:包含你应用程序和所有依赖的路径。这个URL在你的集群中必须是全局可见的。例如,一个hdfs://路径或file://路径对于所有的节点都是存在的。
6、application-arguments:任何需要传递到主类中main方法的参数。

一个常用的部署策略是提交你的应用程序到一个跳板机,这个跳板机是一个与你的worker机器(例如,再一个standalone EC2集群的Master节点)同地协调的机器。在这个设置中,client模式是适当的。在client模式中,驱动器在spark-submit进程中直接的启动,它的行为类似一个client连接到cluster。application的输入和输出被输出到控制台。因此,这种模式对于那种包含REPL(Read-Eval-Print-Loop)(例如,Spark shell)的application是特别适合的。
作为另一种方案,如果你的application是从一个距离worker机器很远的机器上提交的(如,你的本地笔记本),它通常要使用cluster模式来减少drivers和executors之间的网络因素。注意,对于Mesos集群,cluster模式是不能够被正确支持的。当前只有YARN为Python application支持cluster模式。
对于Python application,可以简单的在位置传递一个.py文件来代替一个JAR,并使用–py-files添加Python .zip、.egg或.py文件到查找路径。
这里有一些特殊选项可以被cluster manager来使用。例如,在Spark stadalone cluster中使用cluster部署模式,你也能够使用–supervise来确定driver在非0退出代码失败的时候自动重启。要列举出所有spark-submit可用的选项,可以用–help来运行spark-submit即可。以下有一些常用选项的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8]\
/path/to/examples.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100\
/path/to/examples.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster --supervise --executor-memory 20G \
--total-executor-cores 100\
/path/to/examples.jar \
1000
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50\
/path/to/examples.jar \ 1000
# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000
# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master mesos://207.184.161.138:7077 \
--deploy-mode cluster --supervise --executor-memory 20G \
--total-executor-cores 100\
http://path/to/examples.jar\
1000

Master URLs

Master URL Meaning
local 使用一个worker线程,以本地方式运行Spark(完全不是并行的)
local[K] 使用K个线程,以本地方式运行Spark(概念上,设置这个值为你的机器的核数)
locak[*] 使用和你机器上逻辑核数相同数量的worker,以本地方式运行
spark://HOST:PORT 连接到给定的Spark standalone集群的master。这个port必须是你master配置使用的或默认值7077二者之一
mesos://HOST:PORT 连接到给定的Mesos集群。port必须是你的配置的值或默认值5050二者之一。或者对于一个Mesos集群使用ZooKeeper,使用mesos://Zk://…使用deploy-mode cluster来提交,HOST:PORT应该被配置到MesosClusterDispatcher
yarn 以client或cluster模式连接到YARN集群,连接模式依赖–deploy-mode的值。集群的位置将基于HADOOP_CONF_DIR或YARN_CONF_DIR变量的值进行查找
yarn-client 等同于使用–deploy-mode client的yarn,但应该优先使用yarn
yarn-cluster 等同于使用–deploy-mode cluster的yarn,但应该优先使用yarn

从一个文件中加载配置信息

spark-submit脚本能够从一个配置文件中加载默认的Spark配置变量,并传递它们到你的application。默认,它将从Spark目录conf/spark-defaults.conf中读取选项。对于更多的细节,查看loading default configurations章节获取更多细节信息。
加载默认Spark配置,这种方式能够为spark-submit设置必要的标识。例如,如果spark.master属性被设置,你可以从spark-submit中安全的省略–master标识。一般,设置在SparkConf上的配置值有最高的优先级,然后标识被传递给spark-submit,然后是默认文件中的值。

如果你不清楚配置选项的出处,你可以通过–verbose选项运行spark-submit来打印出调试信息。

Advanced Dependency Management

当使用spark-submit的时候,application jar和其他使用–jar选项进行包含的jar包将自动转发到集群。Spark使用如下的URL模式来为广播jar提供不同的策略:

1、file: - 绝对路径,并且file:/URIs通过driver的HTTP文件服务器提供服务,每个executor从driver HTTP server拉取文件。
2、hdfs:、http:、https:、ftp: - 这些从预期的URI拉取文件和JARs。
3、local: - 一个以local:开始的URI,期望作为一个本地文件存在于每个worker节点上。这意味着不会产生网络IO,并且将大文件或Jar推送到每个worker、或使用NFS、GlusterFS进行共享的方式工作的更好。

注意,那些为executor节点上每个SparkContext而拷贝到工作目录的JARs和文件。可以使用一个有意义的空间超时,并在需要的时候进行清理。使用YARN,清理是自动处理的,使用Spark standalone,清理工作可以使用spark.worker.cleanup.appDataTtl属性来配置。
用户可能通过使用–packages来指定以逗号分隔的maven坐标列表来包含其他依赖。附加的库(或resolvers在SBT中)可以使用–repositories标识以一个逗号分隔的方式来添加。这些命令能够在pyspark、spark-shell和spark-submit中使用来包含Spark Packages。

More Information

一旦你部署了你的application,cluster mode overview描述了与分布式执行的有关的组件以及如何监控和调试application。

使用实例

不引用spark之外的第三方jar包的application的编译和submit,请参考的使用实例

引用spark之外的第三方jar包的application,打包的方式有两种:

  • 将整个application打成一个assembly的jar,将spark之外的jar都包含在该jar中
  • 仍然打成只包含application本身的jar包,引用的第三方jar包通过其他方式进行添加。

将整个jar包打成一个assembly的jar来包含第三方jar包

从一个包含json数据格式的文件中读取数据,并计算数据的content中各个字的统计

手动构建项目结构
1
2
3
4
5
6
7
8
9
10
11
12
13
./src
./src/test
./src/test/resources
./src/test/scala
./src/test/java
./src/main
./src/main/resources
./src/main/scala
./src/main/scala/com
./src/main/scala/com/antispam
./src/main/scala/com/antispam/WordCount.scala
./src/main/java
./build.sbt
创建applcation文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
vim src/main/scala/com/antispam/JsonWordCount.scala
package com.antispam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import net.sf.json.JSONObject
import java.util.Map
import java.util.HashMap
object JsonWordCount {
def main(args:Array[String]) {
val logFile = "hdfs:///tmp/rwm/testData.log"
val conf = new SparkConf().setAppName("json word count application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile)
val rdd1 = logData.map(obj => (JSONObject.fromObject(obj).get("account").asInstanceOf[String], 1))
val rdd2 = rdd1.reduceByKey(_ + _)
rdd2.saveAsTextFile("hdfs:///tmp/rwm/res")
}
}
创建sbt构建文件
1
2
3
4
5
6
7
8
9
10
vim build.sbt
name := "wordCount"
organization := "com.antispam"
version := "0.0.1"
scalaVersion := "2.11.0"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided"
//这里需要这样写,否则会报无法下载jar的异常
libraryDependencies += "net.sf.json-lib" % "json-lib" % "2.4" from "http://repo1.maven.org/maven2/net/sf/json-lib/json-lib/2.4/json-lib-2.4-jdk15.jar"
创建插件配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import AssemblyKeys._
assemblySettings
jarName in assembly := "wordcount_ass_sbt.jar" //jar的名称
mainClass in assembly := Some("com.antispam.JsonWordCount") //主类信息
//jar包中类冲突的合并策略
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
case x => old(x)
}
}
使用sbt进行打包
1
sbt assembly
提交任务到spark集群
1
spark-submit --class com.antispam.JsonWordCount --master spark://spark-002.yz:8081 wordcount_ass_sbt.jar

打独立的jar包,将第三方jar包通过其他方式来发布

除了使用assembly插件将引用的第三方jar中的class添加到要打出的jar中之外,还可以通过上面文章中介绍的其他的方式在提交任务时使用配置参数来引用第三方jar包

  • 使用–jar将本地的jar文件分发到集群
  • 使用–packages将jar的Maven坐标分发给集群,多个Maven坐标可以使用逗号分隔。
  • 将所需的jar包添加到spark的classPath目录中。