本文是对spark编程指南的翻译,主要用于自己对spark的理解,原文档链接
Linking with Spark
Spark 2.0.0默认使用Scala 2.11进行构建和部署。(Spark也可以使用其他版本的Scala进行构建)要使用Scala来编写application,你需要使用一个合适的Scala版本(如2.11.X)。
要编写一个Spark application,你需要在添加Spark的Maven依赖。Spark在Maven中可用的坐标为:
另外,如果你想要访问一个HDFS集群,你需要添加与你的HDFS对应版本的hadoop-client依赖:
最后,你需要在你的代码中引入一些Spark类,添加如下这些行:
(在Spark 1.2.0之前,你需要明确的引入org.apache.spark.SparkContext._来启用必要的隐式转换)
Initializing Spark
一段Spark程序首先要做的一件事情是创建一个SparkContext对象,SparkContext会告诉Spark如何访问集群。要创建一个SparkContext,你首先要构建一个SparkConf对象,SparkConf对象会包含你的application的信息。
每个JVM只可能有一个SparkContext处于活跃状态。在创建一个新的SparkContext之前必须要停止(调用stop()方法)活跃的SparkContext。
appName参数是为你的application指定一个名称,用于在集群UI中显示。master是一个Spark、Mesos或YARN模式的集群URL,如果是本地模式,则是特殊的字符串”local”。在实践中,当你在一个集群上运行application时,你不希望在你的程序中对master进行硬编码,而是希望在使用spark-submit启动application时在程序中接收master的信息。然而如果是为了本地测试或单元测试,你可以传递”local”来在进程内运行Spark。
Using the shell
在Spark shell中,已经为你创建了一个SparkContext,变量名为sc。在Spark shell中构建你自己的SparkContext将无法工作。你可以使用–master参数来设置context要连接的master,还可以通过传递一个逗号分隔的列表给–jars参数来将jars添加到Spark的classpath中。你还可以通过提供一个以逗号分隔的maven坐标的列表给–packages参数来增加依赖到你的Spark session中。依赖可能存在于其他库,这些库可以通过–repositories参数来传递。例如,要以4个core来运行bin/spark-shell,可以使用:
或者添加一个code.jar到它的classpath,使用:
要使用maven坐标来引入依赖,使用:
对于完整的选项列表,可以运行spark-shell –help。
Resilient Distributed Datasets(RDDs)
Spark围绕着一个resilient distributed dataset(RDD)概念,RDD是一个数据项容错集合,RDD能够并行操作。有两种方式可以创建RDDs:在你的driver程序中并行化一个已经存在的集合,或引用额外存储系统中的数据集,外部存储系统可以是共享文件系统、HDFS、HBase或任何提供Hadoop输入格式的数据源。
Parallelized Collections(并行集合)
通过在你的driver代码中已经存在的集合上调用SparkContext的parallelize方法来创建并行集合(一个Scala序列)。集合项并行的从一个分布式数据集中被拷贝。例如,这里展示了如何创建一个并行的集合,集合中保存了1到5这5个数字:
一旦创建完成,这个分布式数据集(上面的distData)能够被并行处理。例如,你可能调用distData.reduce((a, b) => a + b)来将数组中的数组项加起来。我们稍后再描述分布式数据集上的操作。
并行集合的一个重要参数是数据集要拆分成partition的数量。Spark将为集群的每个partition运行一个task。典型的在你的集群中每个CPU有2-4个patitions。通常,Spark基于你的集群自动的尝试设置partitions的数量。然而,你也能够通过给parallelize(如sc.parallelize(data, 10))传递第二个参数来手动设置。注意:在代码的一些位置使用了分片(partition的同义词)来保证向前兼容。
External Datasets
Spark能够从任何被Hadoop支持的存储源创建分布式数据集,包括你的本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark支持文本文件(text文件)、序列文件(Sequence文件)和任何其他Hadoop的输入格式。
文本文件RDDs能够使用SparkContext的textFile方法创建。这个方法需要文件的URI(机器的本地路径或一个hdfs://、s3n://等URI),并作为集合的行进行读取。下面是一个执行例子:
一旦创建完成,distFile能够按照dataset操作进行处理。例如,我们可以使用map和reduce操作将所有行的size加起来:dictFile.map(s => s.length).reduce((a, b) => a + b)。
使用Spark读取文件的一些注意点:
- 如果使用本地文件系统的一个路径,那么这个文件在worker节点的相同路径上也是可以访问的。可以将文件拷贝到所有worker节点,或使用一个网络挂载的共享文件系统。
- Spark所有的基于文件的输入方法,包括textFile,支持目录、压缩文件和通配符。例如,你能够使用textFile(“/my/director”)、textFile(“/my/directory/.txt”)和textFile(“/my/directory/.gz”)。
- textFile方法同样有第二个可选的参数,用于控制文件的partition的数量。默认情况,Spark为文件的每个block(在HDFS中,每个block默认为64MB)创建一个partition,但是你可以通过传递一个更大的值来要求更多数量的partition。注意,你不能设置比blocks更稍等饿partition数量。
除了text文件,Spark的Scala API还支持其他集中数据格式:
- SparkContext.wholeTextFiles可以读取包含多个小文本文件的目录,并将每个小文件作为(filename, content)对进行返回。这与textFile形成对比,textFile将会为每个文件中的每行返回一条记录。
- 对于序列文件,使用SparkContext的sequenceFile[k, v]方法,其中K和V是文件中key和value的类型。这些应该是Hadoop的Writable接口(像IntWritable和Text)的子类。另外,Spark允许你为一些常用的Writable指定原生类型;例如,sequenceFile(Int, String]将会自动读取IntWritables和Texts。
- 对于其他Hadoop输入格式,你可以使用SparkContext.hadoopRDD方法,它需要一个专门的JobConf和输入格式类(input format class)、key类和value类。这与你使用你的输入源为你的hadoop job进行设置的方法相同。你还可以基于“新的”MapReduce API(org.apache.hadoop.mapreduce)为输入格式使用SparkContext.newAPIHadoopRDD。
- RDD.saveAsObjectFile和SparkContext.objectFile支持将一个RDD以序列化的Java对象的简单格式来进行保存。然而对于专业的格式(像Avro)当前还不支持,它值提供了简单的方式来保存RDD。
RDD Operations
RDD支持两种类型的操作:transformations和actions。transformations用于从一个存在的数据集创建一个新的数据集;actions用于在一个数据集上执行计算后将值返回给driver程序。例如,map是一个transformation,它会将数据集的每一项传递给一个方法并返回一个代表了结果的新的RDD。另一方面,reduce是一个action,它使用一些函数聚合了RDD的所有数据项并将最终的结果返回给driver程序(尽管还有一个并行的reduceByKey会返回一个分布式数据集)。
在Spark中,所有的transformations都是懒的,它们不会立即计算数据结果,只是对应用到基础数据集(如文件)的transformatin进行记录。transformation只有当一个action要求返回结果给driver程序时才会计算。这个设计使得Spark的运行更加高效。例如,我们能够知道一个通过map创建的dataset将被用于到一个reduce,并且返回reduce的唯一结果给driver,而不是较大的映射数据集。
默认情况下,你运行一个action时,那么action依赖的被转换的每个RDD可能会被重新计算。然而,你还能够使用persist(或cache)方法将一个RDD保存到内存中,在这种情况中,Spark将把数据项保存在集群中,一般在你下次查询它时更快的访问。同样还支持将RDDs保存到磁盘,或跨多个节点复制。
Basics
要阐述RDD本质,思考一下下面的简单代码:
第一行根据外部文件定义了一个基本的的RDD。这个数据不会被加载到内存中,除非有所行为:lines只是指向到这个文件。第二行定义了lineLengths,作为map转换的结果。再次重申,lineLengths不会被立即计算,这是由于懒机制。最后,我们运行reduce,它是一个action。在这个时候,Spark将计算拆分称tasks,每个task在单独机器上运行,每个机器上都会运行它自己的map部分和reduce,只返回结果给driver程序。
如果我们之后想再次使用lineLengths,我们可以添加:
这个方法能够引发lineLengths在第一次计算完成后,reduce之前被保存到内存中。
Passing Functions to Spark
Spark的API在很大程度上依赖于在driver程序中传递的函数,这些函数在集群中运行。要达到函数的传递,有两种方法可以推荐使用:
- 匿名函数,这通常被用于短的代码片段中。
- 一个全局单例对象的静态方法。例如,你可以定义一个object MyFunctions,并传递MyFunction.func1,如下:
|
|
注意,可能会传递一个类的实例中(相对于单例对象)的一个方法,这要求发送这个对象(包括那个类和方法)。例如,考虑:
这里,如果我们创建一个新的MyClass实例并调用它的doStuff,它内部的map引用了MyClass实例的func1方法,因此,整个对象需要被发送给集群。写rdd.map(this.func1(x))也是类似的。
类似的方式,访问外部对象的字段,也将会引用整个对象:
等同于写rdd.map(x => this.field + x),它引用了this。要避免这个问题,最简单的方式拷贝field到一个本地变量中,来代替外部访问:
Understanding closures
比较难的事情之一是理解变量的作用域和变量的生命周期以及方法什么时候跨集群执行。那些在变量作用域之外修改变量的RDD操作常常是造成混乱的根源。在下面的例子中,我们将看到代码中使用foreache()来对计数器进行增加,但是其他的操作也可能会发生类似的问题。
Example
考虑简单RDD项的向下求和,这个操作可能根据是否在同一个JVM上执行而产生不同的行为。一个常见的例子是当以本地模式运行Spark(–master=local[n]) VS 以集群模式部署Spark application(如通过spark-submit提交到YARN):
Local vs. cluster modes
上面代码的行为是未知的,并且可能无法按预期工作。要执行jobs,Spark将RDD的处理操作拆分到tasks中,每个task被一个executor所执行。在执行之前,Spark计算任务的闭包。闭包是那些对于executor在RDD上执行计算(在本例中是foreach())所必须可见的变量和方法。闭包被序列化并发送给每个executor。
当counter在foreach函数中被引用时,发送给每个executor的闭包中的变量被复制,因此它不再是driver节点上的counter。在driver节点的内存中仍然有一个counter,但是它不再被executors可见!executors只能看到从序列化闭包中复制过来的。因此,counter的最终值仍然是0,直到在序列化闭包中引用counter值的所有操作完成。
在本地模式的一些情况中,foreach函数实际上将会在与driver相同的JVM中运行,并且会引用相同的原生counter,因此可能是真正更新counter。
要确保行为在这些情景中明确定义,一种方式是使用累加器(Accumulator)。在Spark中累加器被用于明确的提供一种机制,通过这种机制可以保证在集群中跨worker节点更新变量是安全的。累加器将会在之后的章节中详细讨论。
通常,闭包 - 结构类似循环或局部定义的方法,不应该被用于改变全局状态。Spark没有定义也无法保证从闭包外部修改对象引用的行为。一些代码在本地模式中可能能够工作,但是那只是意外,并且这样的代码在分布模式中的表现不能满足期望。
如果需要全局聚合应该使用累加器。
Printing elements of an RDD(打印RDD的项)
另外一个常用的是使用rdd.foreach(println)或rdd.map(println)来尝试打印一个RDD包含的项。在单个机器上,这通常能够如期望的打印出RDD包含的所有项。然而在集群模式中,由executors调用的输出stdout会被写到executor的stdout,而不是driver上的标准输出,因此driver上的标准输出不会现实这些项。要在driver上打印出所有的项,可以使用collect()方法先将RDD拿到driver节点,因此:rdd.collect().foreach(println)。这会引发driver的内存溢出,因为collect()会将整个RDD拿到单个机器上;如果你需要打印RDD的一部分,一个比较安全的方法是使用take():rdd.take(100).foreach(println)。
Working with Key-Value Pairs
虽然工作与RDDs大多的Spark操作包含了任何类型的对象,但是有一些操作只对key-value对的RDDs有效。最常见的一些就是分布式shuffle操作,诸如根据key分组或根据key汇总。
在Scala中,这些操作仅对包含Tuple2对象的RDDs有效(在语言中内置的tuples,通过简单的写(a, b)来创建)。key-value对操作在PairRDDFunctions类中有效,PairRDDFunctions自动的将tuples的RDD包装。
例如,下面的代码在key-value对上使用reduceByKey操作来计算文本每行在文件中出现的次数:
我们还能够使用counts.sortByKey(),例如,按字母顺序排列pairs,并最终调用counts.collect()将它们作为一个数组对象返回到driver程序。
注意:当使用自定义对象在key-value对操作中作为key,你必须确认自定义的equals()有一个匹配的hashCode()方法。所有的细节,参考Object.hashCode() “Object.hashCode”)文档。
Transformations
下面的表格列出了Spark支持的常用transformations。参考RDD API文档(Scala、Java、Python、R)和pair RDD函数文档(Scala、Java)获取更多信息。
Transformation | Meaning |
---|---|
map(func) | 通过将源中的每个数据项传递给函数func,来返回一个新的分布式数据集 |
filter(func) | 通过选择源中数据项在func函数中返回true的,来返回一个新的分布式数据集 |
flatMap(func) | 与map相似,但是每个输入项能够被映射到0或多个输出项(因此func应该返回一个序列而不是单个值) |
mapPartitions(func) | 与map相似,但是在RDD的每个partition(block)上分别运行,因此当在一个T类型的RDD上运行时,func的类型必须是Iterator |
mapPartitionsWithIndex(func) | 与mapPartitions相似,但是func还提供了一个整型值来表示partition的索引,因此当在一个T类型的RDD上运行时,func的类型必须是(Int, Iterator |
sample(withReplacement, fraction, seed) | 对数据进行fraction比例的抽样,重复抽样或非重复抽样,并使用给定的随机数生成器种子 |
union(otherDataset) | 返回一个新的数据集,新的数据集将包含源数据集中的所有数据和参数给定数据集中的所有数据项 |
intersection(otherDataset) | 返回一个新的数据集,新的数据集将包含源数据集和参数数据集所有数据项的交集 |
distinct([numTasks]) | 返回一个新的数据集,新的数据集中包含源数据集中数据项去重后的数据项 |
groupByKey([numTasks]) | 当在一个(K, V)对的数据集上调用时,返回一个(K, Iterable |
reduceByKey(func, [numTasks]) | 当在一个(K, V)对的数据集上调用时,返回一个(K, V)对的数据集,其中values是每个key使用给定的reduce函数聚合的,它必须是(V, V) => V类型的函数。在groupByKey中,reduce任务的数量可以通过第二个可选参数进行配置。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 当在一个(K, V)对的数据集上调用时,返回一个(K, U)对的数据集,其中values是每个key使用给定的组合函数和中性零值的聚合得到的。为了避免不必要的分配,允许聚合后的值的类型与输入值的类型不同。和groupByKey相似,reduce任务的数量可以通过第二个可选参数来配置 |
sortByKey([ascending], [numTasks]) | 当在一个(K, V)对的数据集上调用时,返回一个按key正序或倒序排列的(K, V)对的数据集,其中K实现排序,以boolean类型指定ascending参数 |
join(otherDataset, [numTasks]) | 当在(K, V)和(K, W)类型的数据集上调用时,返回一个(K, (V, W))的数据集,(V, W)是每个key的所有数据项的集合。外部连接通过leftOuterJoin、rightOuterJoin和fullOuterJoin来支持 |
cogroup(otherDataset, [numTasks]) | 当在(K, V)和(K, W)类型的数据集上调用时,返回一个(K, (Iterable |
cartesian(otherDataset) | 当在T和U类型的数据集上调用时,返回一个(T, U)对的数据集(数据项的所有pairs)。 |
pipe(command, [envVars]) | 通过一个shell命令(一个perl或bash脚本)将RDD的每个partitions拼接。RDD数据项被写到进程的标准输入并输出到它的标准输出,作为一个RDD的string返回 |
coalesce(numPartitions) | 减少RDD中partition的数量到numPartitions。用于在过滤一个大的数据集之后执行操作更加有效 |
repartition(numPartitions) | 随机的shuffleRDD中的数据来创建更多或更少的partitions,来跨partitions进行平衡。这将总是跨网络混洗所有的数据 |
repartitionAndSortWithinPartitions(partitioner) | 依照给定的拆分器对RDD进行重新分区,在每个产生结果的partition中,根据记录的key进行排序。这比调用repartition然后再在每个partition中排序要高效,因为它能够将排序推到混洗机制中 |
Actions
下面的表格列出了被Spark支持一些常用actions。参考RDD API文档(Scala、Java、Python、R)和对应的RDD函数文档(Scala、Java)来获取更多细节。
Action | Meaning |
---|---|
reduce(func) | 使用一个函数func(它有两个参数一个返回值)来聚合数据集的数据项。这个函数应该做交换或组合,因此它能够被正确的并行计算 |
collect() | 将数据集的所有数据项作为一个数组返回到driver程序。这通常在一个filter或其他操作之后有用,这个filter或其他操作会返回数据的非常小的子集 |
count() | 返回数据集中数据项的个数 |
first() | 返回数据集的第一条数据项(类似take(1)) |
take(n) | 返回包含数据集前n个数据项的数组 |
takeSample(withReplacement, num, [seed]) | 返回一个包含了数据集num个随机样本的数组,可以重复抽取或不重复抽取,可选参数用于预先指定一个随机数生成器种子 |
takeOrdered(n, [ordering]) | 使用原生的或自定义的比较器获取RDD的前n个数据项 |
saveAsTextFile(path) | 将数据集的数据项作为一个文本文件(文本文件集)写到本地文件系统、HDFS或其他被Hadoop支持的文件系统的指定目录中。Spark会在每个数据项上调用toString,来将数据项转换为文件中的一行文本 |
saveAsSequenceFile(path)(Java and Scala) | 将数据集的数据项作为一个Hadoop序列文件写到本地文件系统、HDFS或任何其他被Hadoop支持的文件系统中的指定目录。这只对那些实现了Hadoop的Writable接口的key-value对的RDDs有效。在Scala中,它也对那些隐式转换为Writable的类型有效(Spark有基础类型的转换,如Int、Double、String等) |
saveAsObjectFile(path)(Java and Scala) | 使用Java序列化将数据集的数据以简单的格式写出,这种文件可以使用SparkContext.objectFile()进行加载 |
countByKey() | 只对(K, V)类型的RDD有效。返回一个(K, Int)对的hashmap,(K, Int)是每个key和它的个数 |
foreach(func) | 在数据集的每个数据项上运行一个函数func。它经常用于副作用,例如更新一个累加器或与外部存储系统相互作用。 注意:修改foreach()之外的除了累加器类型的变量可能会引起未定义的行为。查看Understanding closures获取更多的信息 |
Spark RDD API还公开了一些异步action的版本,像异步版本foreach的foreachAsync,它会立刻返回一个FutureAction给调用者,而不是阻塞来等待action的完成。这可以用于管理或等待action的异步执行。
Shuffle operations
在Spark中必然会触发的事件是shuffle。shuffle是Spark的数据重新分发的机制,因此它会跨不同的partition进行分组。这通常会跨executor和机器执行数据拷贝,进行shuffle是一个复杂和高开销的操作。
Background
要理解在shuffle期间发生了什么,我们可以思考一下reduceByKey的例子。reduceByKey生成了一个新的RDD,新的RDD是将单个key组合到一个tuple(key不变,value是这个key的所有value执行reduce函数的结果)中。问题是不是相同partition上单个key的所有值,也不是相同机器上单个key的所有值,而是它们必须协同协作来计算这个结果。
在Spark中,对于某个操作,数据通常不是跨partition分布而位于必须的位置。在计算过程中,单个task将会操作单个partition - 因此,要为单个reduceByKey reduce任务执行组织所有的数据,Spark需要执行一个all-to-all的操作。它必须从所有的partition读取数据来为所有的keys查找所有的值,然后针对某个key需要跨partition将数据拿到一起来计算最终结果-这被称为shuffle。
虽然新的shuffle数据的partition中的数据集是确定的,因此partition是顺序的,而partition中的数据项是非顺序的。如果想跟着shffle有一个预期的顺序数据,那么可以需要使用:
- mapPartitions 用于对每个partition进行排序,例如,.sorted。
- repartitionAndSortWithinPartitions 在重分配partitions的同时有效对partitions进行排序。
- sortBy 产生一个全局的顺序的RDD。
能够引发shuffle的操作包括,复制操作像repartition和coalesce,根据key进行操作(除了计数的)的像groupByKey和reduceByKey,join的操作像cogroup和join。
Performance Impact
Shuffle是一个昂贵的操作,因为它会执行磁盘I/O、数据序列化以及网络I/O。为了给shuffle组织数据,Spark生成一组tasks-map任务来组织数据,一组reduce的tasks来对数据进行聚合。这个术语来自MapReduce,但是没有直接的应用到Spark的map和reduce操作上。
内部,每个map的result被保存在内存中直到它们不能被装下。然后,它们基于目标partition被排序并写到单个文件中。在reduce端,tasks读取相关排序的blocks。
无疑shuffle操作会消耗大量的heap内存,因为它们在转换数据之前或之后要使用内存中的数据结构来组织这些数据记录。明确的说,reduceByKey和aggregateByKey在map端创建这些数据结构,而’ByKey操作在reduce端生成这些数据结构。当数据无法装到内存中时,Spark将会把这些数据溢出到磁盘,这将引发额外的磁盘I/O负载以及增加垃圾回收。
Shuffle还会在磁盘上生成很大数量的中间文件。从Spark1.3起,这些文件会被一直保存,直到对应的RDDs不再使用后被回收。因为这样做,所以如果系统被重新计算shuffle文件不需要被重新创建。如果application维持着这些RDDs的引用或者如果GC不是频繁启动,那么垃圾回收可能会在一段时间之后发生。这意味着长时间运行Spark jobs可能会消费掉很多的磁盘空间。这个临时的存储目录可以在配置Spark context时通过spark.local.dir配置参数指定.
通过调整一些配置参数,shuffle行为能够被优化。查看Spark Configuration中的’Shuffle Behavior’章节。
RDD Persistence
在Spark中一个很重要的能力是persisting(或caching)一个数据集到内存中。当你persist一个RDD时,每个节点存储在内存中的partition会在这个RDD上的其他action中被重用。这使得之后的actions更加快(通常是十倍以上)。缓存对于迭代算法和快速交互的一个关键工具。
使用persist()或cache()方法在一个RDD上,你可以使这个RDD被persist。首先它在一个action中被计算,然后被保存到nodes的内存中。Spark以容错的方式进行缓存 - 如果RDD的任何一个partition丢失,它将会被使用原来创建它的transformations自动的被重新计算。
另外,每个persist的RDD,允许你使用不同的存储级别进行存储,例如,在磁盘上persist数据集、将数据作为序列化对象persist到内存中、跨节点进行复制。这是级别可以通过传递一个StorageLevel对象(Scala、Java、Python)给persist()方法进行设置。cache()方法是使用默认存储级别的速记,使用的级别是StorageLevel.MEMORY_ONLY(在内存中存储序列化的对象)。所有存储级别的设置如下:
Storage Level | Meaning |
---|---|
MEMORY_ONLY | 将RDD作为解序列化的Java对象保存在JVM中。如果RDD在内存中装不下,一些partition将不会被缓存,并在每次需要它们的时候重新计算。这是默认级别 |
MEMORY_AND_DISK | 将RDD作为解序列化的Java对象保存在JVM中。如果RDD在内存中装不下,则将无法存下的partitions存到磁盘,并在需要它们的时候从磁盘进行读取 |
MEMORY_ONLY_SER(Java and scala) | 将RDD作为序列化的Java对象保存(每个partition一个字节数组)。相比解序列化对象,可以节省更多空间,特别是使用一个快的序列化器时,但是读取时会更加花费CPU |
MEMORY_AND_DISK_SER (Java and scala) | 与MEMORY_ONLY_SER类似,但是对于那些装不到内存的partitions,会被溢出到磁盘,而不是在每次需要它们的时候重新计算 |
DISK_ONLY | 将RDD partitions只存储到地盘 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2等 | 和上面的级别相同,只是每个partition会被复制到两个集群节点 |
OFF_HEAD (实验性的) | 与MEMORY_ONLY_SER类似,但是将数据存储到off-heap memory中。这要求off-head memory启用 |
注意:在Python中,存储对象将总是使用Pickle库对其进行序列化,因此你是否选择一个序列化级别都是没有关系的。在Pytho中可用的存储级别包括MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY,DISK_ONLY_2。
Spark在shuffle操作(如reduceByKey)中也会自动的persist一些中间数据,即使用户没有调用persist方法。这样做是为了避免shuffle期间因为某个节点的失败而重新计算整个输入数据。我们仍然推荐用户自己在结果RDD上调用persist,如果想要重复使用这个结果RDD。
Which Storage Level to Choose?
Spark的存储级别意味着在内存使用和CPU效率之间提供了不同的权衡。我们推荐通过下面的流程进行选择:
- 如果你的RDDs使用默认存储级别(MEMORY_ONLY)装到内存,那就这样吧。这是一个最高CPU效率的选项,这允许RDDs上的操作尽可能快的运行。
- 如果上面一条不满足,尝试使用MEMORY_ONLY_SER并选择一个快的序列化库来使得对象是更加空间优化,但是访问速度仍然是相当快的。
- 不要将数据溢出到磁盘,除非计算你的数据集的函数非常的昂贵或着是过滤了很大数量的数据。否则重新计算一个partition可能要比从磁盘读取它更快。
- 如果你想要更快的故障恢复(如果使用Spark来服务来自一个web application的请求),使用副本的存储级别。所有的存储级别通过重新计算丢失的数据来提供完全的容错机制,但是副本的存储级别可以使你继续在RDD上运行tasks,而不要等待丢失partition的重计算。
Removing Data
Spark在每个节点上自动进行缓存的监控,并以least-recently-used(LRU)的方式将旧数据丢掉。如果你想要认为的移除一个RDD而不是等到它离开缓存,可以使用RDD.unpersist()方法。
Shared Variables
通常,当一个函数被传递到一个Spark操作(如map或reduce)用于在一个远程集群节点执行时,所有在函数中的变量会以一个单独的拷贝的方式被函数使用。这些诶变量被拷贝到每个机器,远程机器上的变量更新不会传递到driver程序。一般支持跨tasks读写共享变量是低效的。然而Spark提供了两种类型的共享变量用于两种使用模式:广播变量和累加器。
Broadcast Variables
广播变量允许程序员在每台机器上保存一个只读的变量缓存,而不是跟tasks一起传输变量的拷贝。例如,它们可以被用于以一种有效的方式给每个节点一个大输入数据集的拷贝。Spark还尝试使用有效的广播算法来分发广播变量以降低通信开销。
Spark action通过一系列阶段被执行,每个阶段通过分发的”shuffle”操作分开。Spark自动广播每个步骤中被任务需要的常用数据。以这种方式广播的数据以序列化的方式被缓存,并在运行每个task之前被解序列化。这意味着只有当tasks跨多个阶段需要相同数据时或以非序列化格式缓存数据时创建广播变量是有用的,明确这些很重要。
通过调用SparkContext.broadcast(v),可以根据变量v来创建广播变量。广播变量是v的一个包装,它的值能够通过调用value方法来访问。下面的代码展示了这些:
在创建广播变量之后,它应该被用于任何在集群上运行的函数中来代替值v,这样v就不会被传递给节点多次。另外,对象v在它被广播之后不应该被修改,这是为了确保所有的节点得到的是相同值的广播变量(如果这个变量之后被传递给一个新的节点)。
Accumulators
累加器就是那些通过联合和交换操作只能增加的变量,因此它们能够以并行方式被有效支持。它们能够被用于重要的计数(如在)或求和。Spark天生就支持数字类型的累加器,而且程序员可以添加对新类型的支持。
如果累加器使用一个名称来创建,那么它们将会在Spark的UI中显示。这样对理解运行阶段的进程有帮助(注意:这在Python中还不支持)。
通过调用SparkContext.accumulator(v)来根据初始值v来创建一个累加器。集群上运行的tasks可以使用add方法或+=操作符(在Scala和Java中)对累加器进行累加。然而这些tasks不能去读累机器的值。只有driver程序能够使用累加器的value方法来读取累加器的值。
下面的代码展示了一个累加器被用来将数组的所有项进行累加。
虽然这段代码使用的是累加器内置支持的Long类型,然而程序员可以通过继承AccumulatorV2来创建自己的类型。抽象类AccumulatorV2有很多需要重写的方法:reset用于将累加器重新设置为0;add用于将另一个值累加到累加器;merge用于将另外一个类型相同的累加器与当前的合并在一起。其他需要重写的方法请参考Scala API文档。例如,假设我有一个MyVector类来表示数学向量,我们可以写:
注意,当程序员定义自己的AccumulatorV2类型时,结果的类型可以和添加的数据项的类型相同,也可以不相同。
对于只在actions中执行的累加器更新,Spark保证每个task的对累加器的更新只会被应用一次,比如,重启tasks将不会更新值。在tansformations中,用户应该知道如果tasks或job阶段被重新执行,每个task的更新可能会被应用多次。
累加器不会更改Spark的懒求值模式。如果累加器在一个RDD的操作上被更新,它们的值只会在RDD作为action的一部分被计算时更新一次。因此,当在一个懒transformation(像map)中时,累加器的更新操作是不能够保证执行的。下面的代码片段展示了这个属性:
Deploying to a Cluster
application submission guide描述了如何提交appplication到集群。简而言之,一旦你将你的application打到一个JAR(Java/Scala)或一组.py或.zip文件(对于Python),bin/spark-submit脚本将提交它到任何支持的集群管理器。
Launching Spark jobs from Java/Scala
org.apache.spark.launcher包为使用一个简单Java API启动Spark job作为子进程的类。
Unit Testing
Spark对于使用任何流行的单元测试框架都是友好的。将master URL设置为local来在你的测试中创建一个SparkContext,执行你的操作,然后调用SparkContext.stop()关闭即可。确保你停止context是在finally块或测试框架的tearDown方法中,因为Spark不支持在相同的程序中同时运行两个contexts。
Migrating from pre-1.0 Versions of Spark
Spark1.0为1.X系列冻结了Spark Core的API,在当前有效的任何没有标记”experimental”或”developer API”的API将会在未来的版本中将会被支持。对于Scala用户唯一的更改是分组操作,例如groupByKey、cogroup和join,从返回(Key, Seq[Value])对修改为(Key, Iterable[Value])。
迁移指南对Spark Streaming、MLlib和GraphX也是有效的。
Where to Go from Here
你可以在Spark网站看一些Spark程序实例。另外,Spark在examples目录中(Scala、Java、Python、R)包含了很多的用例。通过传递类名给Spark的bin/run-example脚本来运行Java和Scala的例子;例如:
对于Python的例子,使用spark-submit代替:
对于R的例子,使用spark-submit代替:
为了帮助你优化自己的程序,confuguration和tuning指南提供了最好的实践信息。对于确保你存储在内存中的数据是否是有效的格式特别重要。为了帮助部署,cluster mode overview描述了与分发操作和被集群管理器支持的组件。
最终,所有的API文档在Scala、Java、Python和R中是有效的。