Configuration

本文是对Spark配置的翻译,主要用于本人学习使用,原文请参考

Spark提供了三个用于对系统配置的位置:

  • Spark properties控制大多数application参数,可以通过使用SparkConf对象设置或通过Java系统属性设置。
  • Environment variables可以设置每台机器的设置,如IP地址,通过每个节点上的conf/spark-env.sh脚本。
  • Logging可以通过log4j.properties来配置。

Spark Properties

Spark属性控制大多数application设置,并且对每个application进行独立配置。这些属性可以直接在SparkConf上设置,SparkConf会传递给你的SparkContext.SparkConf,来允许你控制一些常用属性(如master的URI和application的名称等),通过set()方法达到和key-value对一样。例如,我们可以使用两个线程来初始化一个application,如下:
注意我们使用local[2]运行,意味着两个线程-表示最低的并行,这样能够帮助我们发现那些只有在分布式context上运行才会出现的bug。

1
2
3
4
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)

注意在本地模式中我们可以使用多个线程,但是在像Spark Streaming中,我们实际上要求使用多个线程,来避免任何饥饿情况的发生。
指定时间属性时需要配置时间单位,下面的格式是可以接受的:

1
2
3
4
5
6
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)

指定字节大小的属性应该配置一个大小单位,下面的格式是可以接受的:

1
2
3
4
5
6
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)

Dynamically Loading Spark Properties

在一些情况中,我们可能想要避免在SparkConf中硬编码某一配置。例如,如果你想要以不同的master或不同数量的内存来运行相同的application。Spark允许你简单的创建一个空的配置对象:

1
val sc = new SparkContext(new SparkConf())

然后,你可以在运行时提供配置值:

1
./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

Spark shell和spark-submit工具支持两种方式来动态加载配置信息。第一种是命令行选项,诸如上面展示的–master。spark-submit能够接收任何使用–conf标识指定的Spark属性,但是对于作为Spark application启动一部分的属性使用特殊标识。运行./bin/spark-submit –help将展示选项的全部列表。
bin/spark-submit还将从conf/spark-defaults.conf中读取配置选项,在这个文件中,每行包含使用空格符分隔的一个key和一个value。例如:

1
2
3
4
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer

任何通过标识或子属性文件中指定的值将会传递到application并通过SparkConf进行合并。直接在SparkConf中设置的属性拥有最高优先级,然后是通过spark-submit或spark-shell传递的标识,最后是spark-defaults.conf。少部分的keys已经从早期的版本中被重命名了;在这种情况中,旧的key的名字仍然可以接受,但是比任何新key的实例的优先级都低。

Viewing Spark Properties

Spark的web UI(地址为http://:4040)中的”Environment”tab页中累出了Spark的属性。这个地方用来检查你的属性是否已经被正确设置是很有用的。注意,只有通过spark-default.conf、SparkConf或命令行指定的值才会出现在这里。对于其他的值,你可以假设它们使用的是默认值。

Available Properties

大多数控制内部设置的属性都设置了合理的默认值。一些常用的选项是如下设置:

Application Properties

Property Name Default Meaning
spark.app.name (none) 你的application的名称。这将会出现在UI和日志数据中
spark.driver.cores 1 用于driver进程的核的数量,只在集群模式中有用
spark.driver.maxResultSize 1g 为每个Spark action(如collect)限制全部parition的序列化结果的总大小。至少应该为1M或者设置为0表示不限制。如果Job的Result的总大小超过这个限制,Job将会被中断。设置一个高的限制可能会引发driver的内存溢出(依赖spark.driver.memory和JVM中对象的内存开销)。设置一个合适的值可以保护driver远离内存溢出错误
spark.driver.memory 1g 用于driver进程的总内存数,例如SparkContext在多大内存中初始化(如:1g、2g)。注意:在client模式中,这个配置不能在你的application中通过SparkConf直接设置,因为在这个时候driver JVM已经启动了。应该通过–driver-memory命令行选项或在你的默认属性文件中配置来设置。
spark.executor.memory 1g 每个executor进程使用的内存总数
spark.extraListeners (none) 一个以逗号分隔的实现了SparkListener的类的列表;当初始化SparkContext时,这些类使用Spark的listener bus被创建并和注册。如果类有只接收SparkConf对象作为参数的构造函数,则会调用这个构造函数,否则调用无参的构造函数。如果没有找到有效的构造函数,SparkContext将创建失败并抛出异常
spark.local.dir /tmp 在Spark中用于”打草稿”的空间,包括mpa输出文件和在磁盘上进行排序的RDD。这应该是你系统中的快盘。它也可以是一个以逗号分隔的多个不同磁盘上的不同目录的列表。注意:在Spark1.0和后续版本中,这将被集群管理器设置的SPARK_LOCAL_DIRS(standalone, Mesos)或LOCAL_DIRS(YARN)环境变量重写。
spark.logConf false 当一个SparkContext被启动时,以INFO级别对起作用的SparkConf记录日志。
spark.master (none) 指定要连接的集群管理器。查看允许的masterURL列表
spark.submit.deployMode (none) Spark driver程序的部署模式,”client”或”cluster”,它们表示在集群中的一个节点上本地(“client”)还是远程(“cluster”)启动driver程序

除了这些,下面的属性也是可用的,并且可能在一些问题中有用:

Runtime Environment

Property Name Default Meaing
spark.driver.extraClassPath (none) 额外的classpath项,用来追加到driver的classpath之前。注意:在client模式中,这个配置必须不能通过在你的application中的SparkConf直接设置,因为driver JVM在这个时候已经启动了。应该通过–driver-class-path命令行选项类设置或在你的默认属性文件中设置
spark.driver.extraJavaOptions (none) 传给driver的额外的JVM选项字符串。例如GC设置或其他日志记录。需要注意的是使用这个选项来设置heap的的最大值(-Xmx)是非法的。heap最大值,在集群模式中可以使用spark.driver.memory设置,在client模式中可以通过–driver-memory命令行选项来设置。注意:在client模式中,这个配置必须不能通过在你的application使用SparkConf直接设置,因为在这个时刻,driver JVM已经启动。应该通过–driver-java-options命令行选项来设置,或者在你的默认属性文件中设置
spark.driver.extraLibraryPath (none) 当启动driver JVM时,设置一个特殊的library路径提供使用。注意,在client模式中,这个配置必须不能通过你的applicaiton中的SparkConf直接设置,因为这个时候driver JVM一就能够启动,应该使用–driver-java-options命令行选项来设置,或在你的默认属性文件中设置
spark.driver.userClassPathFirst false (实验性的)在driver中加载类时,是否给用户添加的jars比Spark自身的jars更高的优先级。这个功能可以用来减缓Spark依赖和用于依赖之间的冲突。它当前还是实验性的,并且只能用于集群模式中
spark.executor.extraClassPath (none) 额外的classpath记录预先添加到executors的classpath。它的存在是为了向后兼容老版本的Spark。用户通常不需要设置这个选项
spark.executor.extraJavaOptions (none) 传递给executors的额外的JVM选项的字符串。例如,GC设置或其他日志记录。注意,使用这个选项来设置Spark的属性或设置heap最大值是违法的。Spark的属性应该使用SparkConf对象设置或使用park-submit脚本时使用spark-defaults.conf来设置。heap最大值应该通过spark.executor.memory来设置
spark.executor.extraLibraryPath (none) 在启动executor JVM时设置一个特殊的library路径
spark.executor.logs.rolling.maxRetainedFiles (none) 设置由系统保留的日志文件的个数。较老的日志文件将被删除。默认不可以
spark.executor.logs.rolling.maxSize (none) 设置executor日志滚动的最大字节大小,超过这个大小的日志将被滚动。滚动默认是被禁用的。自动清理老的日志请查看spark.executor.logs.rolling.maxRetainedFiles
spark.executor.logs.rolling.strategy (none) 设置executor日志滚动策略。默认它是禁用的。它可以设置为”time”(按时间滚动)或”size”(按大小滚动)。对于”time”,使用spark.executor.logs.rolling.time.interval来设置滚动间隔;对于”size”,使用spark.executor.logs.rolling.maxSize来设置滚动的最大文件size
spark.executor.logs.rolling.time.interval daily 设置executor日志滚动的时间间隔。默认滚动是禁用的。有效的值有daily、hourly、minutely或任何以秒为单位的间隔数。老日志的自动清理请参考spark.executor.logs.rolling.maxRetainedFiles
spark.executor.userClassPathFirst false (实验性的)与spark.driver.userClassPathFirst功能相同,只是应用在executor实例上
spark.executorEnv.[EnvironmentVariableName] (none) 添加由EnvironmentVariableName指定的环境变量到executor进程。用户可以指定多个这样的变量来设置多个环境变量
spark.python.profile false 在Python worker中启用性能分析,描述结果将通过sc.show_profiles()显示出来,或在driver退出之前显示。描述结果也可以通过sc.dump_profiles(path)方法dump到磁盘。如果一些描述结果被人为禁用了,在driver退出之前它们不会被显示。默认使用pyspark.profiler.BasicProfiler,但是可以通过传递一个profiler类给SparkContext构造器来重写这个profiler
spark.python.profile.dump (none) 在driver退出之前,描述结果dump到的目录。描述结果会将每个RDD作为单独的文件进行dump。它们可以通过ptats.Stats()进行加载。如果这个值被指定了,那么描述结果将不会自动被显示
spark.python.worker.memory 512m 聚合期间,每个python worker进程使用的内存总数,使用和JVM内存字符串相同的格式(如 512m,2g)。如果在聚合期间内存的使用超过这个值,将会溢出数据到磁盘
spark.python.worker.reuse true 是否重用Python worker。如果设置为true,将使用固定数量的Python worker,不需要为每个task来fork()一个Python进程。如果有一个很大的广播,这将是非常有用的,那么这个广播就不需要为每个task将JVM装换为Python worker
spark.files 以逗号分隔的文件列表,用于放置每个executor的工作目录
spark.submit.pyFiles 以逗号分隔的.zip、.egg或.py文件的列表。用于为Python apps设置PYTHONPATH
spark.jars 以逗号分隔的本地jars的列表,以便将这些jars包含到driver和executor classpath中
spark.jars.packages 以逗号分隔的jars的Maven坐标,以便将这些jars包含到driver和executor classpath中。查询的顺序是本地库、mavne中最后是通过spark.jars.ivy指定的远程库。坐标的格式为groupId:artifactId:version
spark.jars.excludes 以逗号分隔的groupId:artifactId列表,用于在解析由spark.jars.packages提供的依赖时进行排除,以避免依赖冲突
spark.jars.ivy 以逗号分隔的附加远程库的列表,用来搜索spark.jars.packages中提供的坐标的jars

Shuffle Behavior

property Name Default Meaning
spark.reducer.maxSizeInFlight 48m 从每个reduce任务同时拉取map输出的最大size。因为每个输出要求我们创建一个buffer来接收它,这表示每个reduce任务一个固定内存的开销,因此保持它小,除非你有很大的内存
spark.reducer.maxReqsInFlight Int.MaxValue 这个配置限制了在任何点获取blocks的远程请求的数量。随着集群主机的数量的增长,可能会导致到一个或多个节点的拨入连接数量很大,引发workers负载下的失败。通过限制获取请求的数量,可以环节这种情况
spark.shuffle.compress true 是否加密map的输出文件。通常是一个好主意。压缩将事宜哦那个spark.io.compression.codec
spark.shuffle.file.buffer 32k 每个shuffle文件输出流的内存buffer的大小。这些buffer在创建中间shuffle文件过程中降低了磁盘寻址的次数和系统调用
spark.shuffle.io.maxRetries 3 如果这个值设置为非零值,那么由于IO相关异常而失败的获取将会自动重试。这个重试逻辑帮助稳定shuffle面对长s时间的GC暂停或短暂的网络连通的问题
spark.shuffle.io.numConnectionsPerPeer 1 对于大集群,为了减少连接的建立,主机之间的连接会被重用。对于有用很多磁盘,但有很少主机的集群,这可能会导致到所有磁盘的并发不足,因此用户可能会考虑增加这个值
spark.shuffle.io.preferDirectBufs true shuffle和cache block transfer期间用于垃圾回收的off-heap huffer。对于off-heap内存被紧紧限制的环境,用户可能希望将其关闭来触发来自Nettry的分配到on-heap中
spark.shuffle.io.retryWait 5s fetch的重试之间等待的时长。默认由重试引发的最大延迟为15秒,是通过maxRetries * retryWait计算出来的
spark.shuffle.service.enabled false 启用外部的shuffle服务。这个服务保存了由executors写的shuffle文件,因此executors能够被安全移除。如果spark.dynamicAllocation.enabled设置为”true”,那么该功能必须启用。为了启用它,外部的shuffle服务必须被启动。查看dynamic allocation configuration and setup documentation获取更多信息
spark.shuffle.service.prot 7337 外部shuffle服务运行的端口号
spark.shuffle.sort.bypassMergeThreshold 200 (高级的)在基于排序的shuffle管理器中,如果没有map端的聚合,避免合并排序数据,并且最多有这么多的reduce partition
spark.shuffle.splill.compress true 在shuffle期间,是否对溢出的数据进行压缩。压缩将使用spark.io.compression.codec

Spark UI

Property Name Default Meaning
spark.eventLog.compress false 如果spark.eventLog.enabled为true,是否对记录的event进行加密
spark.eventLog.dir file:///tmp/spark-events 如果spark.eventLog.enabled为true,Spark events被记录的基本目录。在这个基本目录中,Spark为每个application创建一个子目录,并记录application的events到这个目录中。用户可能想要设置这个值为一个统一的位置,例如一个HDFS目录,因此历史文件可以被历史服务器读取
spark.eventLog.enable false 是否记录Spark events,用于在application完成之后重新构建Web UI
spark.ui.killEnabled true 允许计划和对应的jobs可以通过web ui被kill掉
spark.ui.port 4040 application的报表图的端口,用来显示内存和负载数据
spark.ui.retainedJobs 1000 在垃圾回收之前,被Spark UI和status APIs记住的job的数量
spark.ui.retainedStages 1000 在垃圾回收之前,被Spark UI和status APIs记住的计划的数量
spark.worker.ui.retainedExecutors 1000 在垃圾回收之前,被Spark UI和status APIs记住的已完成的executors的数量
spark.worker.ui.retainedDrivers 1000 在垃圾回收之前,被Spark UI和status APIs记住的已完成的drivers的数量
spark.sql.ui.retainedExecutors 1000 在垃圾回收之前,被Spark UI和status APIs记住的已完成的executions的数量
spark.streaming.ui.retainedBatches 1000 在垃圾回收之前,被Spark UI和status APIs记住的已完成的batches的数量
spark.ui.retainedDeadExecutors 100 在垃圾回收之前,被Spark UI和status APIs记住的死掉的executors的数量

Compression and Seriallzation

Property Name Default Meaning
spark.broadcast.compress true 在发送广播变量之前是否对其进行压缩。通常是个好主意
spark.io.compression.codec lz4 用户对中间数据(诸如RDD partition、广播变量以及shuffle输出)进行压缩的编码器。默认,Spark提供了3中编码器:lz4、lzf和snappy。你也可以使用完整的类名来指定编码器,如:org.apache.spark.io.LZ4CompressionCodeC、org.apache.spark.io.LZFCompressionCodec和org.apache.spark.io.SnappyCompressionCodec
spark.io.compression.lz4.blockSize 32k 当LZ4压缩编码器使用时,LZ4压缩使用的block的大小。当LZ4被使用时,降低这个block的大小也会降低shuffle内存的使用
spark.io.compression.snappy.blockSize 32k 当使用Snappy压缩编码器时,Snappy压缩使用的block的大小。当Snappy被使用时,降低这个block的大小也会降低shuffle内存的使用
spark.kryo.classesToRegister (none) 如果你使用了Kryo序列化,给出一个以逗号分隔的自定义类名的列表,这些类要使用Kryo注册。查看tuning guide获取更多细节
spark.kryo.referenceTracking true(当使用Spark SQL Thrift Server时为false) 当使用Kryo序列化数据时,是否跟踪相同对象的引用,如果你的对象图有环那么这是必须的,并且如果相同对象包含多个拷贝,这对性能也是有用的。如果你知道这不是问题,可以禁用它来提高性能
spark.kryo.registrator (none) 如果你是用Kryo序列化,给出一个以逗号分隔的类的列表,这些类是用来使用Kryo进行注册的自定义类。如果你需要以自定义的方式注册你的类,这是非常有用的,例如,指定一个自定义字段序列化器。否则,spark.kryo.classToRegistor是简单的。查看tuning guide获取更多细节
spark.kryoserializer.buffer.max 64m Kryo序列化huffer的允许的最大大小。这个必须要任何你试图序列化的对象大。如果你在Kryo中得到一个”buffer limit exceeded”,则增加这个值
spark.kryoserializer.buffer 64k Kryo序列化buffer的初始大小。注意在每个worker上一个core一个buffer。如果需要,这个值会增长到spark.kryoserializer.buffer.max
spark.rdd.compress false 是否压缩序列化的RDD(例如,在Java或Scala中的StorageLevel.MEMORY_ONLY_SER或Python中的StorageLevel.MEMORY_ONLY)。耗费额外的CPU来节省大量的空间
spark.serializer org.apache.spark.serializer.JavaSerializer (当使用Spark SQL Thrift Server时是org.apache.spark.serializer.KryoSerializer) 用于对那些需要通过网络发送的对象或需要以序列化格式缓存的对象进行序列化的类。默认使用Java序列化生成器但是它太慢了,当你要求速度的时候,我们推荐使用org.apache.spark.serializer.KryoSerializer并配置Kryo序列化。该值可以是任何org.apache.spark.Serializer的子类
spark.serializer.objectStreamReset 100 当使用org.apache.spark.serializer.JavaSerializer进行序列化时,序列化生成器会缓存对象来防止写冗余数据,然而这会停止这些对象的垃圾回收。通过调用reset你可以从序列化生成器中刷新那些信息,从而允许旧的对象被收集。要关掉这个周期操作,可以将其设置为-1。默认情况下,将会没100个对象reset一次序列化生成器

Memory Management

Property Name Default Meaning
spark.memory.fraction 0.6 用于执行和存储的分数(300MB的heap空间)。这个值越低,溢出和数据缓存回收发生的就越频繁。这个配置的目的是为内部元数据、用户数据结构和很少情况中不精确size的估算、异常大的记录设置一个预留内存。推荐使用默认值。更多细节,包括关于当增加这个值时正确调节JVM垃圾回收的重要信息,查看这个描述
spark.memory.storageFraction 0.5 免于回收的存储内存的总数,以spark.memory.fraction设置的预留区域大小的小数来表示。这个值越高,用于执行的工作内存也少,并且tasks溢出到磁盘可能越频繁。推荐使用默认值。更多细节,请参考这个描述
spark.memory.offHeap.enabled false 如果为true,Spark将会试图为某些操作使用off-heap内存。如果off-heap内存被启用,那么spark.memory.offHeap.size必须确定
spark.memory.offHeap.size 0 可以被用于off-heap分配的内存的绝对值,以字节为单位。这个设置对heap内存的使用没有影响,因此如果你的executor的总内存的消耗必须适合一些硬件的限制,然后确保相应的缩小你的JVM heap大小
spark.memory.useLegacyMode false 是否启用遗留的内存管理模式,用于Spark1.5及之前版本。这个遗留模式,严格的划分heap空间到固定大小区域,如果applicaiton没有优化,可能会导致过多的溢出。除非该设置启用,否则不需要了解下面的废弃的内存配置:spark.shuffle.memoryFraction spark.storage.memoryFraction spark.storage.unrollFraction
spark.shuffle.memoryFraction 0.2 废弃
spark.storage.memoryFraction 0.6 废弃
spark.storage.unrollFraction 0.2 废弃

Execution Behavior

Property Name Default Meaning
spark.broadcast.blockSize 4m 一个TorrentBroadcastFactory block的每个碎片的大小。这个值太大,在广播期间会降低并发性(使它变慢)。然而,如果太小,BlockManager可能会有性能问题
spark.executor.cores 在YARN模式中为1,在standalone和Mesos coarsegrained模式中为worker上所有可用的core 每个executor使用的core的数量。在standalong和Mesos coarse-grained模式中,设置这个参数可以允许一个application在用一个worker上运行多个core,假如worker上有足够的core。否则,在每个worker上每个application只能运行一个executor
spark.default.parallelism 对于分布式shuffle操作(像reduceByKey和join),一个父级RDD的最大partitions的数量。对于像parallelize这样没有父级RDD的操作,依赖于集群管理器:+ 本地模式:本机上core的数量 + Mesos fine grained模式:8 + 其他:所有executor节点上的core的总和或2,去较大值 在RDDs中由transformations(如join、reduceByKey和parallelize)返回的默认partitions的数量,如果用户没有设置这个数量的话。
spark.executor.heartbeatInterval 10s 每个executor到driver的心跳间隔。心跳使driver知道executor仍然活着,并使用metrics为进程内的tasks更新driver
spark.files.fetchTimeout 60s 当从driver通过SparkContext.addFile()获取文件时使用的通信超时时间
spark.files.useFetchCache true 如果设置为true(默认),文件的拉取将使用一个本地缓存,这个本地缓存被属于相同application的executors共享,当在同一个主机上运行多个executors时,能够增进task的启动性能。如果设置为false,这些缓存的优化将被禁用,所有的executors将会获取它们自己的文件拷贝。这个优化被禁用可能是为了使用Spark本地目录,这些目录位于NFS文件系统(查看SPARK-6313获取更多细节)上
spark.files.overwrite false 通过SparkContext.addFile()添加文件时,如果目标文件已经存在并且它的内容和源文件不匹配,是否对文件重写
spark.hadoop.cloneConf false 如果为true,为每个task复制一个新的Hadoop Configuration对象。该选项应该启用来解决对Configuration有线程安全的问题(查看SPARK-2546获取更多细节)。该功能默认是禁用,为了避免不会受那些问题影响的job会有异常的性能下降
spark.hadoop.validateOutputSpecs true 如果设置为true,验证输出规范(如,是否检查输出目录已经存在),该验证用于saveAsHadoopFile和其他变体形式中。该功能可以被禁用,在输出目录预先存在时不抛出异常。我们建议用户不要禁用此功能,除非是尝试与之前版本的Spark进行兼容。简单的通过手动使用Hadoop的文件系统API来删除输出目录。这个设置会被由Spark Streaming的StreamingContext生成的Job忽略,因为在checkpoint恢复期间,数据可能需要重新写入到已存在的输出目录中
spark.storage.memoryMapThreshold 2m 当从磁盘读取一个block时,Spark将block映射到内存,该值为映射内存的大小。这可以放置Spark内存映射到非常小的block。通常内存映射需要很高的负载来关闭blocks或降低操作系统的page size

Networking

Property Name Default Meaning
spark.rpc.message.maxSize 128 在“控制层”通讯中所允许的最大message的大小;通常只应用在map在executors和driver之间输出信息的大小。如果你以上千个map和reduce任务来运行jobs,可以增加这个值并查看关于RPC message大小的信息
spark.blockManager.port (random) 所有block manager的监听端口。它们存在于driver和executors上
spark.driver.host (local hostname) driver监听的IP地址或主机名。这用于和executors和standalone Master的通信
spark.driver.port (random) driver监听的端口号。用于和executors和standalone Master的通信
spark.network.timeout 120s 所有网络交互的默认超时时间。这个配置将被放置在spark.core.connection.ack.wait.timeout、spark.storage.blockManagerSlaveTimeoutMs、spark.shuffle.io.connectionTimeout、spark.rpc.askTimeout或spark.rpc.lookupTimeout中,如果这些值没有配置
spark.port.maxRetries 16 在绑定一个端口时,在放弃之前的最大尝试次数。当端口被指定为特定值(非0)时,每个后续的尝试操作在重试之前会在之前尝试的端口号上加一。本质上,这允许尝试的端口号的范围为指定端口号到指定的端口号+ maxRetries
spark.rpc.numRetries 3 在一个RPC任务放弃之前,重试的次数。一个RPC任务最多运行这些次
spark.rpc.retry.wait 3s 一个RPC ask操作在重试之前等待的时长
spark.rpc.askTimeout 120s 一个RPC ask操作在超时之前等待的时长
spark.rpc.lookupTimeout 120s 一个RPC远程端口查找操作在超时之前等待的时长

Scheduling

Property Name Default Meaing
spark.cores.max (not set) 当在standalone部署集群或“coarse-grained”共享模式的Mesos集群上运行时,跨集群(不是从每台机器)为applicaiton请求CPU core的最大数量。如果不设置,则在Spark的standalone集群管理器上使用spark.deploy.defaultCores配置的值,对于Mesos集群管理器则不限制(所有可用的core)
spark.locality.wait 3s 在放弃之前启动本地优势数据的任务的等待时长,然后在一个非本地的节点上启动。这个等待时间会用于其他几个位置级别(process-local、node-local、rack-local和其他)。通过设置spark.locality.wait.node等配置,你也可以为每个级别设置自定义的等待时长。如果你的task很长并且有很少的本地性任务,那么你应该增加这个值,但是通常默认值工作的很好
spark.locality.wait.node spark.locality.wait 为node本地性设置自定义的本地性等待。例如,你可以设置这个值为0来跳过node本地性并立即为rack本地性进行查找(如果你的集群有rack信息)
spark.locality.wait.process spark.locality.wait 为本地性进程自定义本地性等待。这个配置影响了那些在并行executor进程中试图访问缓存数据的tasks
spark.locality.wait.rack spark.locality.wait 为本地性rack自定义本地性等待
spark.scheduler.
maxRegisteredResourcesWaitingTime
30s 在调度开始之前,等待资源注册的最长时间
spark.shceduler.
minRegisteredResourcesRatio
对于YARN是0.8;对于standalone和Mesos coarse-grained模式是0.0 在开始调度之前等待已注册资源的比例(已注册的资源 / 总的期望资源)(在yarn模式中子域按时executors,在standalone模式和Mesos coarsed-grained模式中是CPU cores[对于Mesos coarse-grained模式,期望的资源的总值为’spark.cores.max’的值])。值应该是0.0到1.0之间的double类型值。不管最小比例的资源是否已经到达,只会等待通过配置spark.scheduler.
maxRegisteredResourcesWaitingTime的时间时长
spark.scheduler.mode FIFO 提交到相同SparkContext的jobs之间调度模式。可以设置为FAIR,来使用公平模式以代替队列模式。主要用于多用户的服务中
spark.scheduler.revive.interval 1s 调度器恢复worker资源供tasks运行的间隔长度
spark.speculation false 如果设置为true,执行tasks的推测执行。这表示如果一个或多个tasks在一个阶段运行缓慢,他们将被重新启动
spark.speculation.interval 100ms Spark为tasks进行推测的检测频率
spark.speculation.multiplier 1.5 一个task比中值慢多长时间则可以考虑推测
spark.speculation.quantile 0.75 某个特定阶段在推测启动之前tasks必须完成的百分比
spark.task.cpus 1 每个task分配的core的数量
spark.task.maxFailures 4 job在放弃之前能够失败的task的数量(超过这个数量就放弃)。应该大于等于1.允许重试的数量为该值 - 1

Dynamic Allocation

Property Name Default Meaning
spark.dynamicAllocation.enabled false 是否使用动态资源分配,它会基于工作负载对application注册或注销executor。更多的细节,查看这里的描述。这要求spark.shuffle.service.enabled被设置。如下是相关的配置:spark.dynamicAllocation.minExecutors、spark.dynamicAllocation.maxExecutors和spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.executorIdleTimeout 60s 如果动态分配被启用且一个executors已经空闲的时间超过该值,这个executor将被移除。更多细节,请参考这个描述
spark.dynamicAllocation.
cachedExecutorIdleTimeout
infinity 如果动态分配被启用并且某个缓存了数据块的executor的空闲时间超过这个值,这个executor将会被移除。更多细节,请参考描述
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 如果动态分配启用,初始运行的executor的数量
spark.dyanmicAllocation.maxExecutors infinity 如果动态分配被启用,executors的上限
spark.dynamicAllocation.minExecutors 0 如果动态分配被启用,executors的下限
spark.dyanmicAllocation.schedulerBacklogTimeout 1s 如果动态分配被启用,并且有为启动的tasks积压超过这个时长,将会请求新的executor。更多的细节,请参考描述
spark.dynamicAllocation.
sustainedSchedulerBacklogTimeout
schedulerBacklogTimeout 与spark.dyanmicAllocation.
schedulerBacklogTimeout相同,但是只对后续的executor请求有用。更多细节请参考描述

Security

Property Name Default Meaning
spark.acls.enable false 是否启用Spark的访问控制列表。如果启用,这将检查用户是否有权限浏览或修改job。注意,这要求用户需要知道,如果用户以null访问,那么验证则不能完成。过滤器可以和UI配合使用进行用户的认证和设置
spark.admin.acls Empty 以逗号分隔的用户或管理员的列表,这些用户或管理员能够浏览和修改所有的Spark job。这可以被用来运行一个共享集群,并且有一组管理员或开发人员,他们能够在不正常工作的时候帮助调试。设置”*”在列表中则表示任何用户都有管理员特权
spark.admin.acls.groups Empty 以逗号分隔的组列表,这些组能够浏览和修改所有的Spark jobs。如果你有一组管理员或开发人员来帮助维护和调试底层架构,这样是非常有用的。在列表中放置”*”表示在任何组中的用户都有管理员的特权。用户组从通过spark.user.groups.mapping指定的组映射提供器实例获取组信息。检查spark.user.group.mapping获取更多信息
spark.user.groups.mapping org.apache.spark.security.
ShellBasedGroupsMappingProvider
群组列表,用户通过由接口org.apache.spark.security.
GroupMappingServiceProvider定义的群组映射服务获取,接口的实现通过该属性配置。默认是一个基于unix shell的实现,通过org.apache.spark.security.
ShellBasedGroupMappingProvider提供,它决定了用户所属的群组列表。注意,这个实现只对基于Unix/Linux的环境提供支持。Windows环境当前不支持。然而一个新的平台/协议通过实现org.apache.spark.security.
GroupMappingServiceProvider接口而被支持
spark.authenticate false Spark是否要对内部连接进行认证。如果不是运行在YARN上,查看spark.authenticate.secret
spark.authenticate.secret None 设置用于Spark组件之间认证的密钥
spark.authenticate.enableSaslEncryption false 当启用认证时,启用加密通信。如果不是运行在YARN上且认证被启用,那么这需要被设置
spark.network.sasl.serverAlwaysEncrypt false 对于那些支持SASL认证的服务,禁用非加密的连接
spark.core.connection.ack.wait.timeout 60s 在超时并放弃之前连接等待确认的时长。要避免由GC引发的长时间暂停导致的非意愿性的超时,你可以设置一个较大的值
spark.core.connection.auth.wait.timeout 30s 在超时并放弃之前,连接等待认证的时长
spark.modify.acls Empty 以逗号分隔的用户列表,这些用户将有修改Spark job的权限。默认只有启动Spark job的用户能够修改job(如kill掉job)。在列表中放置一个“*”表示所有用户都可以修改job
spark.modify.acls.groups Empty 逗号分隔的组列表,这些组拥有修改Spark job的权限。假设你有一组来自相同team的管理员或开发人员,这些人想要控制job的权限,那么该配置很有用。将“*”放置在列表中则表示任何人都有修改Spark job的权限。用户的groups可以从由spark.user.groups.mapping指定的映射提供器来获得。查看spark.user.groups.mapping项获取更多细节
spark.ui.filters None 被应用到Spark web UI的过滤器类的名字的列表,以逗号分隔。过滤器应该是一个标准的javax.servlet.Filter。每个filter的参数可以通过设置一个java系统属性来指定:spark..params=’param1=value1,param2=value2’,例如:-Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params=’param1=foo,
param2=testing’
spark.ui.view.acls Empty 以逗号分隔的用户列表,这些用户有浏览Spark web UI的权限。默认只有启动Spark job的用户有浏览权限。将“*”放到列表中,则表示任何用户都可以浏览Spark job
spark.ui.view.acls.groups Empty 以逗号分隔的用户组列表,这些组可以访问Spark web UI来浏览Spark Job的详细信息。如果你有一组管理员或开发人员或用户能够监控spark job的提交,可以使用组功能。在列表中放置“*”,意味着任何组中的用户东iu可以在Spark web ui中浏览Job的明细。用户分组从有spark.ui.groups.mapping指定的组映射提供器的实例中中得到。查看spark.user.groups.mapping项获取更多信息

Encryption

Property Name Default Meaning
spark.ssl.enabled false 是否在所有已支持的协议上启用SSL连接。所有的SSL设置(例如spark.ssl.xxx),其中xxx是一个特殊的配置属性,表示所有已支持的协议的全局配置。为了给某个特殊协议重写全局配置,这些属性必须在明确的协议命名空间中重写。
使用spark.ssl.YYY.XXX来为由YYY表示的特殊协议重写全局配置。当前YYY只能是文件服务fs
spark.ssl.enabledAlgorithms Empty 逗号分隔的算法列表。指定的算法必须被JVM支持。引用的协议列表可以在这个页面找到。注意:如果不设置,它将使用适合JVM的默认算法
spark.ssl.keyPassword None 密钥存储中私钥的密码
spark.ssl.keyStore None 密钥存储文件的路径。该路径可以是绝对路径,也可以是组件启动目录的相对路径
spark.ssl.keyStorePassword None 密钥存储的密码
spark.ssl.keyStroeType JKS 密钥存储的类型
spark.ssl.protocol None 协议名称。该协议必须被JVM支持。协议列表的引用可以在这个页面找到
spark.ssl.needClientAuth false 如果SSL需要客户端认证,则设置为true
spark.ssl.trustStore None 到信任存储文件的路径。该路径可以是绝对路径,也可以是组件启动目录的相对路径
spark.ssl.trustStorePassword None 信任存储的密码
spark.ssl.trustStoreType JKS 信任存储的类型

Spark SQL

执行SET -v命令将会展示SQL配置列表。

1
2
// spark is an existing SparkSession
spark.sql("SET -v").show(numRows = 200, truncate = false)

Spark Streaming

Property Name Default Meaning
spark.streaming.backpressure.enabled false 启用或禁用Spark Streaming的内部反压力机制(从1.5开始)。这将启用Spark Streaming基于当前批次的调度延迟和处理时间来控制接收比率,因此系统的接收只能和系统的处理速度一样快。这将动态设置receivers的最大接收比率。这个值的上限为spark.streaming.receiver.maxRate和spark.streaming.kafka.maxRatePerPartition,如果这些属性被设置
spark.streaming.backpressure.initialRate not set 初始最大接收速率,当反压机制启用时,每个接收器将以这个速率接收第一批数据
spark.streaming.blockInterval 200ms 被Spark Streaming receivers接收的数据,在将数据存储到Spark中之前,拆分这些数据到数据块的间隔。最小的推荐值为50毫秒。查看Spark Streaming编程指南中performance tuning章节获取更多信息
spark.streaming.receiver.maxRate not set 每个receiver接收数据的最大速率(每秒的记录数)。实际上,每个stream每秒最多消费这些条的记录。将该配置设置为0或负数将不会限制这个速率。查看Spark编程指南中的部署指南获取更多细节
spark.streaming.receiver.
writeAheadLog.enable
false 为receivers启用写ahead日志。所有通过接收器接收到的输入数据将被写到ahead日志一般保存,这样将在driver失败之后进行恢复。查看Spark Streaming编程指南中的部署指南获取更多信息
spark.streaming.unpersist true 促使由Spark Streaming生成和保存的RDDs从Spark’s内存中自动取消保存。由Spark Streaming接收的原生数据也会被自动清除。设置这个值为false,将允许原生数据和保存的RDDs能够在streaming application外部访问,因为它们不会被自动清理。但是它带来更高的内存使用的开销
spark.streaming.stopGracefullyOnShutdown false 如果为true,在JVM关闭时会优雅的将streamingContext关闭,而不是立即关闭StreamingContext
spark.streaming.kafka.maxRatePerPartition not set 当使用新的Kafka stream API时,从每个Kafka partition读取数据的最大速率(每秒记录的数量)。查看Kafka Integration guide获取更多细节
spark.streaming.kafka.maxRetries 1 driver为了在每个partition的leader上找到最后的offset(默认值为1,表示driver最多尝试两次),连续尝试的最大数。只能应用于新的Kafka stream API
spark.streaming.ui.retainedBatches 1000 Spark Streaming UI和status APIs能够记住的batches的数量,超过这个数量的将被回收
spark.streaming.driver.writeAheadLog.
closeFileAfterWrite
false 在driver上写完一条ahead日志记录后,是否要关闭对应的文件。当你想要使用S3时(或其他不支持flush的文件系统)设置这个选项为true
spark.streaming.receiver.writeAheadLog.
closeFileAfterWrite
false 在receiver上写完一条ahead日志记录后,是否要关闭对应的文件。当你想要使用S3时(或任何其他不支持flush的文件系统),设置这个选项为true

SparkR

Property Name Default Meaning
spark.r.numRBackendThreads 2 被RBackend用来处理来自SparkR包的RPC访问的线程数
spark.r.command Rscript 用于在集群模式中为driver和workers执行R脚本的执行器
spark.r.driver.command spark.r.command 用户在客户端模式中为driver执行R脚本的执行器。在集群模式中忽略

Deploy

Property Name Default Meaning
spark.deploy.recoveryMode NONE 设置恢复模式,当以集群模式提交的jobs失败并重启时jobs如何恢复。这只能应用于以Standalone或Mesos方式管理的集群模式的job
spark.deploy.zookeeper.url None 当’spark.deploy.recoveryMode’被设置为ZOOKEEPER时,这个配置用于设置要连接的Zookeeper URL
spark.deploy.zookeeper.dir None 当’spark.deploy.recoveryMode’被设置为ZOOKEEPER时,这个配置用于设置保存恢复状态的Zookeeper目录

Cluster Managers

Spark中的每个cluser管理器都有额外的配置选项。对于每个模式的配置,可以在这些页面中找到:

Environment Variables

某些Spark的配置可以通过环境变量来设置,这些环境变量会从Spark的安装目录中的conf/spark-env.sh脚本中读取(对于windows,会从conf/spark-env.cmd中读取)。在Standalone和Mesos模式中,这些文件可以给出机器的特定信息(如主机名)。当运行本地Spark application或提交脚本时,还会source(unix命令)这个文件。
注意,当Spark被安装时,默认是不存在conf/spark-env.sh文件的。然而,你可以拷贝conf/spark-env.sh.template来创建它。确保拷贝后的文件可以被执行。
下面的变量可以在spark-env.sh中设置:

Environment Variable Meaning
JAVA_HOME Java的安装位置(如果没有在你的默认路径中)
PYSPARK_PYTHON 在driver和worker中,用于PySpark执行Python库
PYSPARK_DRIVER_PYTHON 只在driver中,用于PySpark执行的Python库
SPARK_DRIVER_R 用于SparkR shell执行的R库(默认是R)
SPARK_LOCAL_IP 机器要绑定的IP地址
SPARK_PUBLIC_DNS 你的Spark程序要通知给其他机器的主机名

除了上面的这些,还有一些选项用于Spark standalone cluster scripts,例如每个机器使用的cores的数量和最大内存。
因为spark-env.sh是一个shell脚本,环境变量需要在你的conf/spark-defaults.conf中使用soaprk.yarn.appMasterEnv.[EnvironmentVariableName]属性设置。在集群模式中,在spark-env.sh中设置的环境变量将不能在YARN Application Master进程中引用到。查看YARN-related Spark Properties获取更多信息。

Configuring Logging

Spark使用log4j记录日志。你可以通过在conf目录中添加一个log4j.properties文件来配置它。一种方法是拷贝已经存在的log4j.properties.template文件。

Overriding configuration directory

要指定一个不同与默认”SPARK_HOME/conf”的配置目录,你可以设置SPARK_CONF_DIR。Spark将使用来自这个目录的配置文件(spark-defaults.conf、spark-env.sh、log4j.properties)。

Inheritiong Hadoop Cluster Configuration

如果你想要使用Spark读写HDFS,有两个Hadoop配置文件应该包含在Spark的classpath中:

  • hdfs-site.xml,它为HDFS客户端提供了默认行为。
  • core-site.xml,它设置了默认文件系统名称。
    这些配置文件的位置根据CDH和HDP的版本而变化,但是常用的一个位置是在/etc/hadoop/conf中。一些工具,诸如Cloudera Manager,以on-the-fly算法创建,但是提供一种机制来下载这些配置的副本。
    要是这些文件对于Spark可见,在$SPARK_HOME/spark-env.sh中设置HADOOP_CONF_DIR为包含这些配置文件的位置。