本文是Tuning Spark文档的翻译,原文档请参考,本文主要用于个人学习。
Tuning Spark
因为大多数Spark计算是内存中的计算,因此集群中的任何资源都能够成为Spark程序的瓶颈:CPU、网络带宽或内存。通常,如果数据装载到内存中,瓶颈可能是网络带宽,但是有些时候,你还需要做一些调整,例如以序列化格式存储RDDs,以降低内存的使用。本指南覆盖了两个主题:数据序列化和内存调整,其中数据序列化对好的网络性能是至关重要的,并且还可以降低内存的使用。我们还会概述其他一些小的主题。
Data Serialization
序列化在任何分布式application的执行中扮演了很重要的角色。那些序列化缓慢的对象或消费很大数量byte的格式将极大的减慢计算。通常,这是优化一个Spark application首先要调整的东西。Spark的目标是在方便(允许你在你的操作中使用任何Java类型)和性能之间达到一个平衡。它提供了两种序列化库:
- Java serialization: 默认,Spark使用Java的ObjectOutputStream框架进行序列化对象操作,能够和任何你实现了java.io.Serializable接口的类型一起工作。通过继承java.io.Externalizable,你能够更加近的控制你的序列化的执行。Java序列化是灵活的,但是通常是慢的,这导致对于很多类会有很大的序列化格式。
- Kryo serialization: Spark能够使用Kryo库(版本2)来更快的序列化对象。相对于Java序列化Kryo显然是更快且更加简单的(通常是10倍还多),但是不支持所有的可序列化类型,并且需要你在程序中将你要使用的类进行注册以便获取更好的执行。
通过使用SparkConf初始化你的job,并调用conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)来转换为使用Kryo。这个设置不止为了wroker节点混洗数据配置序列化生成器,当序列化RDDs到磁盘时也有用。Kryo不作为默认序列化生成器的唯一原因是需要自定义注册,但是我们推荐在任何网络集中型application中使用它。
Spark自动的很多常用的核心Scala类包含Kryo序列化生成器,涵盖在Twitter chill库的AllScalaRegistrar中。
要使用Kryo注册你自己的自定义类,使用registerKryoClasses方法。
Kryo documntation描述了更加高级的注册选项,如添加自定义序列化编码器。
如果你的对象很大,你可能需要增加spark.kryoserializer.buffer配置的值。这个值需要足够大以便保存你要序列化的最大对象。
最后,如果你没有注册你的自定义,Kryo将仍然能够工作,但是它将存储每个对象的全类名,这样损耗很大。
Memory Tuning
在调整内存的用法中,有三种值得考虑:被你的对象使用的内存的总量(你可能想要将整个数据集装配到内存中)、访问这些对象的开销和垃圾回收的开销(如果你在这些对象)。
默认,Java对象是快速访问的,但是在它们字段的内部很容消费掉比原始数据多2-5倍的空间 。这是因为有如下原因:
- 每个不同的Java对象有一个”object header”,它大概是是16个字节,并包含信息,诸如一个指向它的class的指针。对于一个带有非常少数据的对象(假设一个Int字段),这可能要比数据大很多。
- Java Strings在原生的string数据上有一个大概40字节的开销(因为它们被存储到一个Chars数组中,并且保存了额外的数据,如长度),并且每个字符以两个字节存储,因为String内部使用的UTF-16进行编码。因此一个包含10个字符的字符串很容易消耗掉60个字节。
- 常见的集合类,如HashMap和LinkedList,使用linked数据结构,每个数据项是一个包装对象(如Map.Entry)。这种对象不只有header,而且还有指针(通常是8个字节)指向列表中的下一个对象。
- 原始类型常常以包装对象来存储,诸如java.lang.Integer。
这一章我们将以Spark中内存管理的概述开始,接着讨论用户能够在他的application中使用而产生内存使用效率的具体策略。另外,我们将描述如何确定你的对象的内存使用,以及如何改善它 - 通过修改你的数据结构或使用一个序列化格式存储数据。我们将涵盖Spark的缓存调整和Java垃圾回收调整。
Memory Management Overview
Spark中内存的使用主要落在两种类型的一个上面:执行和存储。执行内存用于在shuffles、joins、sorts和aggregations中,而存储内存用于缓存和跨集群的内部数据传播。在Spark中执行和存储共享一个统一的区域(M)。当没有执行内存使用时,存储能够获取所有可用的内存,反之亦然。如果必要,执行可能驱逐存储,直到总的存储内存使用低于一个确切的值(R)。换句话说,R在M中描述了一个子区域用来缓存blocks,这个区域的blocks永远不会被驱逐。由于实现的复杂,存储可能不会驱逐执行。
这个设计确保了一些明智的属性。首先,那些没有使用缓存的application可以使用整个空间来进行执行操作,避免不必要的磁盘溢出。第二,那些使用了缓存的application保留了一个最小的存储空间(R),这里的block免于被驱逐。最后,这个方法为多样化的工作量提供了一个合理的out-of-the-box的执行,而不需要用户专业的去进行划分。
然而,还有两个相关的配置,通常用户不需要调整他们,使用默认值对大多数工作量都是合适的:
- spark.memory.fraction M空间的确切大小,以JVM heap空间的分数表示(默认为0.6)。剩余的空间(25%)为用户存储数据结构、Spark中的内部元数据以及在极少情况下和非常大的记录中保护OOM错误。
- spark.memory.storageFraction R空间的确切大小,以M的分数表示(默认为0.5)。R是M中的存储空间,用来缓存blocks,避免被执行驱逐。
spark.memory.fraction的值应该的符合JVM的old generation或”tenured” generation中heap空间。否则,当有太多空间用于缓存或执行时,tenured greneration将会满,从而引发JVM垃圾回收的时间明显增长。查看Java GC sizing documentation获取更多信息。
tenured generation大小由JVM的NewRatio参数控制,默认为2,意味着tenured generation是new generation(heap的剩余)的两倍大小。因此,默认,tenured generation使用2/3或大约0.66的heap。为spark.memory.fraction设置为0.6来保存old generation中存储和执行有空闲的空间。如果spark.memory.fraction增加,假设0.8,那么NewRatio可能要增加到6或更多。
NewRatio作为一个JVM标识对executors进行设置,这意味着添加spark.executor.extraJavaOptions=-XX:NewRatio=x到Spark job的配置中。
Determining Memory Consumption
测量创建一个RDD,一个数据需要请求的计算消耗总数的最好方式是,将它放到缓存中,并查看Web UI中”Storage”页。这个页面将会告诉你这个RDD使用的内存。
要计算一个特定对象的内存消耗,使用SizeEstimator的estimate方法。这对于尝试以不同数据层来缩减内存的使用是有用的,类似于确定一个广播变量在每个executor heap上占用量。
Tuning Data Structures
降低内存消耗的首选方法是避免Java特征(Java特征会增加负载),诸如基于指针的数据结构和包装对象。有一些方法来完成这个:
1、设计你的数据结构引用数组对象,并且是原始类型,而不是标准的Java或Scala集合类(例如HashMap)。fastutil库为原始类型提供了方便的集合类,这些集合类与Java标准库是兼容的。
2、如果可能,避免很多小对象的嵌套结构。
3、对于keys,考虑使用数值类型的ID或枚举对象取代string类型。
4、如果你的RAM小于32GB,设置JVM标识-XX:+UseCompressedOops使指针使用4个字节而不是8个。你能够添加这些选项到spark-env.sh中。
Serialized RDD Storage
当你的对象过于庞大,即使调优也无法有效的存储时,一个更加简单的方法是以序列化格式来存储它们以降低内存的使用,使用RDD persistence API中序列化的存储级别,例如MEMORY_ONLY_SER。那么Spark将以一个大的字节数组来存储一个RDD partition。唯一的缺点是以序列化格式存储的下游,访问时间会更慢,因为获取数据时需要对每个对象进行解序列化操作。如果你想要以序列化格式缓存数据,我们高度推荐使用Kryo,因为它比Java序列化的size更小(确切的是比原生的Java对象)。
Garbage Collection Tuning
当你有大量”搅拌”的RDDs的数据项通过你的程序存储时,JVM垃圾回收会是一个问题。(在你的程序中读取一个RDD一次,并在上面执行很多操作通常不是问题)当Java为驱逐老数据为新数据提供空间时,你需要跟踪所有你的Java对象并找出没用的对象。这里需要记住的一点是垃圾收集的开销与Java对象的数量是成正比的,因此为少量对象使用数据结构(例如一个Ints的数组代替LinkedList)将很大的降低这个开销。甚至一个更好的方法是以序列化格式保存对象,正如上面描述的:每个RDD partition将只有一个对象(一个字节数组)。在尝试其他技术之前,首先要尝试的是使用序列化缓存来判断GC是否是问题。
因为你的task的工作内存(运行task所需的空间)和你节点上RDDs缓存之间的干扰,GC也将是一个问题。我们讨论如何控制RDD缓存空间的分配以缓和这个问题。
Measuring the Impace of GC
在GC优化中的第一步是收集垃圾收集的频率和GC花费的时间长度。这可以通过添加-verbose:gc -XX:+printGCDetails -XX:+PrintGCTimeStamps到Java选项来实现。(查看配置指南来获取传递给Spark jobs的java选项的信息)你的Spark job下一次运行时,你将会在worker的日志中打印每一次垃圾回收的message。注意的是,这些日志将在你集群的worker节点上(这它们工作目录的stdout文件中),而不是driver程序的节点上。
Advanced GC Tuning
要近一步优化垃圾回收,我们首先理解一些关于在JVM中内存管理的基本概念:
- Java Heap空间分为两个区域Young和Old。Young代表示保存存活时间较短的对象,而Old代用于较长生命时长的对象。
- Young代近一步分为三个区域Eden、Survivor1和Survivor2。
- 对于垃圾回收程序的一个简单描述是:当Eden满了时,一个轻量级的GC在Eden上运行,那些存活在Eden和Survivor1中的对象会拷贝到Survivor2中。Survivor区域是交换用的。当一个对应足够老或Survivor2满了,它被移动到Old代中。最终当Old也接近满的时候,一个完整的GC被执行。
在Spark中,GC优化的目标是确保长时间存在的RDDs值被存储在Old代中,而Young代有足够的空间来存储短时间存在的对象。这将能够帮助避免完整GC收集在task执行期间产生的临时对象。一些有用的步骤如下:
- 通过收集GC状态来确认是否有太多的垃圾收集。如果在一个task完成之前,全面的GC被执行多次,这意味着没有足够可用的内存来执行tasks。
- 在被打印的GC状态中,如果Old代接近满了,通过降低spark.memory.storageFraction的值来减少用于缓存的内存。缓存较少的对象要比减慢task的执行更好。
- 如果有很多轻量级的收集,但是没有很多的重要的GCs,为Eden分配更多的内存会有所帮助。你可以根据每个task所需的内存的预估来设置Eden。如果Eden的大小确定为E,那么你可以使用-Xmn=4/3*E来设置Young代的大小。(4/3的比例说明由Survivor区域使用的空间)
- 作为一个例子,如果你的task从HDFS读取数据,由task使用的内存量可以使用从HDFS读取数据块的size来估计。注意,一个解压缩后的数据块的大小通常是那个数据块的2到3倍。因此如果我们希望有3到4个任务量的工作空间,并且HDFS数据块的大小为64MB,那么我们能够确定的Eden的大小为4364MB。
- 监控使用新的设置后垃圾收集的频率和时间。
我们的经验建议是,GC优化的效果依赖于你的application和可用的内存总数。这里有很多在线描述的优化选项,但是在高层次上,管理完整GC发生的频率能够帮助降低开销。
Other Considerations
Level of Parallelism
集群不会被充分利用,除非你为每个操作设置了足够高的并发级别。Spark依照每个文件的大小自动设置在文件上运行”map”任务的数量(虽然你可以通过SparkContext.textFile的可选参数来控制它),并且分发”reduce”操作,诸如groupByKey和reduceByKey,它使用partitions的最大父级RDD的数量。你可以传递并发姐别作为第二个参数(查看spark.PairRDDFunctions文档),或者设置配置属性spark.default.parallelism来更改默认值。通常,我们推荐在你的集群中每个CPU 2-3个tasks。
Memory Usage of Reduce Tasks
有些时候,你会得到一个OutOfMemoryError错误,这个错误并不是因为你的RDDs装不到内存中,而是因为你的tasks中的一个task的工作集合,诸如groupByKey中的reduce任务中的一个太大了。Spark的shuffle操作(sortByKey, groupByKey, reduceByKey, join等)在每个task中构建一个哈希表来执行分组,这个哈希表通常很大。最简单的解决方法是增加并行的级别,因此每个task的输入集会更小。Spark能够有效的支持最短200毫秒的tasks,因为它跨很多tasks重利用executor JVM,并且它有很低的task启动开销。因此你能够安全的增加并行级别到比你集群的cores的数量还要多。
Broadcasting Large Variables
在SparkContext中使用广播变量功能能够极大的降低每个序列化task的工作量,以及降低集群上一个job的启动开销。如果driver程序中任何task中使用了大对象,考虑将它转换为一个广播变量。Spark在master打印每个task的序列化大小,因此你可以查看并判断你的tasks是否太大;通常一个task大于20KB,就值得优化。
Data Locality
数据本地化优势在Spark job的执行上有很大的影响。如果数据和操作数据的代码都在一个节点上,那么计算将会很快。如果数据和代码是分离的,那么必须将它们移动到一起。通常将代码从一个地方拷贝到另一个地方要比拷贝数据更快,因为代码的大小要比数据块晓得多。Spark围绕着数据本地化优势这个普遍原则来构建它的调度。
数据本地化优势是处理数据的代码和数据有多近。基于数据的当前位置,有一些级别的本地化优势。按照从近到远的顺序:
- PROCESS_LOCAL 数据和运行它的代码在相同的JVM中。这是最好的本地化优势。
- NODE_LOCAL 数据和代码在相同的节点上。例如可能在相同节点的HDFS中,或者在相同节点的另一个executor中。这要比PROCESS_LOCAL慢一点,因为数据需要在进程间传输。
- NO_PREF 等同于从任何地方访问数据,没有本地优势。
- RACK_LOCAL 数据在服务器的相同机架上。数据在相同机架的不同服务器上,因此需要跨网咯传输,通常只跨一个交换机。
- ANY 数据在不同相同机架上的其他网络上。
Spark喜欢以最好的本地化优势级别来调度所有的tasks,但这不总是可能的。在那些没有未处理的数据的空间executor上,Spark转换为较低级别的本地化优势。有两个选项:a)在数据所在的server上等待,直到有一个忙的CPU释放,然后启动一个task。b)立即在一个较远位置启动新的task,这需要请求远程数据。
Spark通常会等待一会儿希望能有忙碌的CPU被释放。一旦时间过期,它将从远处移动数据到空闲的CPU。每个级别的回退等待时间可以单独配置或者一起使用一个配置;查看spark.locality参数获取更多信息。如果你的task长并且缺少本地化优势,你应该增加这个配置值,但是默认值通常很好了。
Summary
这里已经有一个简短的指南来指出关于你在优化一个Spark application时的主要关注点 - 最主要的,数据序列化和内存优化。对于大多数程序,转换为Kryo序列化并以序列化格式保存数据将解决大多数常见的性能问题。关于其他的优化实践,可以在Spark mailing list上自由发问。