Spark Streaming Programming Guide

本文是Spark Streaming手册的翻译文档,会随着自己的实现进行更新,官方文档请参考

Overview

Spark Streaming是核心Spark API的一个延伸,它对实时数据流进行可扩展的、高吞吐量的、容灾的进行处理。数据可以从很多源(如Kafka、Flume、Kinesis或TCP socket)进行提取,然后被复杂的算法组合处理,这些复杂的算法可以使用高级别的函数,如map、reduce、join和window。最后,被处理过的数据可以推出到外部文件系统、数据库和实时图表中。实际上你可以在数据流上应用Spark的机器学习图处理
spark streaming architecture
在内部,它如下工作。Spark Streaming接收实时的输入数据流,并将数据划分到批次中,然后在批次中数据被Spark引擎处理并生成最终的结果流。
Spark Streaming data flow
Spark Streaming提供了一个高级别的抽象,叫做discretized stream或DStream,它代表了一个连续的数据流。DStream可以从来自数据源(如Kafka、Flume和Kinesis)的输入数据流创建,也可以通过在其他DStream上应用高级别的操作来创建,一个DStream以一个RDDs序列来表示。
本指南展示了如何开始使用DStreams来编写Spark Streaming程序。你可以使用Scala、Java或Python(从Spark1.2中引入)来编写Spark Streaming程序,这些语言的代码都会在本指南中提供。你会发现tabs在本指南中随处可见,是你可以在不同语言的代码片段之间任意选择。
注意:在Python中有少量的APIs是不同或不可用的。贯穿整个指南,你会发现Python API标签高亮了这些不同。

A Quick Example

在进入如何编写你的Spark Streaming程序的详细信息之前,我们快速的看一个简单的Spark Streaming程序是什么样的。假设我们想要计算来自TCP socket服务的文本数据中每个字出现的次数。所有你需要的如下:
首先,我们导入Spark Streaming类的名称和一些从StreamingContext到我们环境的隐式转换,以便将有用的方法添加到我们需要的其他类中。StreamingContext是全部streaming功能的主要点。我们创建一个带有两个执行线程的本地StreamingContext,并且它的批次间隔为1秒。

1
2
3
4
5
6
7
8
9
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

使用这个context,我们能够创建一个DStream,这个DStream代表了来自TCP数据源的流数据,TCP数据源通过主机名(如localhost)和端口号(如9999)来指定。

1
2
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

名为lines的DStream代表了从数据服务器接收到的数据流。DStream的每条记录是一行文本。接下来,我们根据空白字符将行拆分为字。

1
2
// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap是一个一到多的DStream操作,会生成一个新的DStream,生成的方式是根据源DStream中的每条记录生成多条新的记录。在这个例子中,每行将被拆分成多个字,字的流以名为words的DStream来表现。接下来,我们想要对这些字进行计数。

1
2
3
4
5
6
7
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

名为words的DStream之后会映射(一到一的转换)到一个(word, 1)元组的DStream,新的DStream之后会进行reduce计算来计算数据的每个batch中字的频率。最终wordCounts.print()将会打印出每秒生成的计数。
注意,当这些行被执行时,Spark Streaming只会建立计算,只有当它启动时才会执行,在没有启动之前是不会有真正的执行的。要在所有的转换构建完毕后启动进程,最终我们调用:

1
2
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

完整的代码能够在Spark Streaming例子NetworkWordCount中找到。
如果你现在并构建了Spark,你可以如下运行这个例子。首先你需要运行netcat(一个小单元,可以在大多数类Unix系统中找到)作为一个数据服务,启动如下:

1
$ nc -lk 9999

然后,在另一个终端中使用如下命令进行启动:

1
$ ./bin/run-example streaming.NetworkWordCount localhost 9999

然后,在运行netcat server的终端上输入的任何行将会被计数并以每秒的频率打印在屏幕上。你将会看到一些东西,这些东西看起来如下:

1
2
3
4
5
6
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world

1
2
3
4
5
6
7
8
9
10
# TERMINAL 2: RUNNING NetworkWordCount
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

Basic Concepts

接下来,我们越过这个简单的例子,详细描述一下Spark Streaming的基本概念。

Linking

与Spark类似,Spark Streaming可以通过Maven中心得到。要编写你自己的Spark Steaming程序,你需要将下面的依赖添加到你的sbt项目或Maven项目中。

  • Maven

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.0.0</version>
    </dependency>
  • sbt

    1
    2
    3
    libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.0"
    //有的时候可能需要这样写,因为会在打包的报错:[error] (*:assembly) deduplicate: different file contents found in the following:
    //libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.1" % "provided"

要从数据源(像Kafka、Flume、Kinesis)摄取数据,这些数据源没有包含在Spark Streaming的核心API中。你需要添加对应的坐标spark-streaming-xyz_2.11到依赖中。例如,下面是一些常用的:

Source Artifact
Kafka spark-streaming-kafka-0-8_2.11
Flume spark-streaming-flume_2.11
Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]

最新的列表,请参考Maven repository来获取所支持的数据源的坐标的完整列表。

Initialized StreamingContext

要初始化一个Spark Streaming程序,必须要创建一个StreamingContext对象,它是Spark Streaming功能的主要进入点。
一个StreamingContext可以根据一个SparkConf对象来创建。

1
2
3
4
5
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

参数appName是的application要在cluster UI上显示的名称。master是一个Spark、Mesos或YARN集群的RUI,或者是在以本地模式运行时的一个特殊字符串”local[]”。在实践中,当你在一个集群上运行时,你可能不想在程序中硬编码master参数,而是想要在使用spark-submit启动application并在这里作为参数进行接收。然而对于本地测试和单元测试,你可以传递”local[]”,以进程内的方式运行Spark Streaming(在本地系统中发现core的数量)。需要注意的是,在内部会创建一个SparkContext(所有Spark功能的启动点),可以通过ssc.sparkContext来访问。
batch间隔必须要基于延迟要求和可用的系统资源来设置。查看性能优化章节,获取更多信息。
也可以根据一个已经存在的SparkContext对象来创建StreamingContext对象。

1
2
3
4
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

在一个context被定义之后,你还需要做如下的事情:

  • 通过创建input DStreams来定义输入数据源。
  • 通过对DStreams应用transformation和输出操作来定义流的计算。
  • 使用streamingContext.start()来启动数据接收和数据处理。
  • 使用streamingContext.awaitTermination()来等待进程的停止(手动或发生错误)。
  • 使用streamingContext.stop()能够人为的停止进程。
    需要记住的几点:
  • 一旦一个context被启动,则没有新的streaming计算可以被启动或添加到context中。
  • 一旦一个context被停止,它不能被重新启动。
  • 在同一时刻在一个JVM中只有一个StreamingContext可以是活跃的。
  • 在StreamingCotnext的stop()也会停止SparkContext。想要只是停止StreamingContext,设置stop()的可选参数为false,从而不调用stopSparkContext。
  • 一个SparkContext可以重复使用来创建多个StreamingContexts,只要前面的StreamingContext在下一个StreamingContext创建之前被关闭(不关闭SparkContext)即可。

Discretized Streams(DStream)

Discretized Stream或DStream是由Spark Streaming提供的基本抽象。它代表了一个连续的数据流,从数据源接收到的输入数据流或通过对输入流进行转换而生成的处理过的数据流。内部,一个DStream以一个连续的RDDs系列来表示,它是Spark的一个不可变的、分布式数据集(查看Spark Programming Guide获取更多细节)的抽象。
Streaming DStream
任何应用于DStream上的操作都会翻译成为底层RDDs上的操作。例如,之前例子中转换行为字,在名为lines的DStream中的每个RDD上应用flatMap操作来生成名为words的DStream的RDDs。在下图图中展示。
Streaming DStream Operation
这些下层的RDD转换由Spark引擎转换完成。DStream操作隐藏了大部分的细节,并且为了方便,提供开发者一个高级别的API。这些操作在之后的章节中会详细讨论。

Input DStreams and Receivers

输入DStreams是表示那些从streaming源接收输入数据流的DStream。在quick example中,lines是一个输入DStream,它代表了从netcat服务接收数据的流。每个输入DStream(除了文件流,将在本章的后面讨论)与一个ReceiverScala文档Java文档)对象关联,Receiver从一个数据源接收数据并将数据存储到Spark的内存中以便处理。
Spark Streaming提供了两种内建的streaming数据源。

  • 基本数据源:在StreamingContext API中直接可用的数据源。例如:文件系统和socket连接。
  • 高级数据源:像Kafka、Flume、Kinesis等这样的数据源,通过额外的实用工具类可用。正如在linking章节中讨论的,这需要对额外的依赖进行linking。

在本章的稍后,我们将讨论每中类型的数据源中的一些。
如果你想在你的streaming application中并行的接收多个数据流,你可以创建多个输入DStream(在Performance Tuning章节中进一步讨论)。这将创建多个receiver,这些receiver将同时接收多个数据流。注意,一个Spark worker/executor是一个长时间运行的任务,因此它占用分配给Spark Streaming application的一个cores。所以,记住一个Spark Streaming application需要分配足够的cores来处理接收到的数据以及运行receiver(s)(或线程,如果是以本地模式运行)是很重要的。

需要记住的点

  • 当本地运行一个Spark Streaming程序时,不要使用”local”或”local[1]”作为master的URI。这表示只有一个线程被用于运行本地任务。如果你使用了一个基于receiver的输入DStream(如sockets、Kafka、Flume等),那么单线程会被用于运行receiver,就没有线程来处理接收到的数据。因此,当以本地模式运行时,总是使用”local[n]”作为master URI,其中n大于运行receiver的数量(查看Spark Properties获取如何设置master的信息)。
  • 扩展到在一个集群上运行的逻辑,分配给Spark Streaming application的cores的数量必须多于receivers的数量。否则系统将会接收数据,但是不能够处理它。

Basic Sources

quick example中我们已经看到了ssc.socketTextStream(…),它根据一个接收文本数据的TCP socket连接创建一个DStream。除了sockets,StreamingContext API还提供了根据文件作为数据源创建DStream的方法。

  • File Stream:用于从任何与HDFS API符合的文件系统(HDFS、S3、NFS等)上的文件读取数据,可以这样创建一个DStream:
    1
    streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming将会监控目录dataDirectory,并处理在这个文件中创建的任何文件(不支持目录中文件的嵌套)。注意

  • 文件必须有相同的数据格式。
  • 文件必须是通过原子级别移动或重命名到数据目录的方式在dataDirectory中创建的。
  • 一旦被移动,这些文件不能再被更改。如果这些文件继续被追加数据,那么新的数据将不会被读取到。
    对于简单的文本文件,有一个更加简单的方法streamingContext.textFileStream(dataDirectory)。文件流不要求运行一个receiver,因此不需要分配core。
    Python API fileStream在Python API中不可用,只有textFileStream是可用的。
  • Streams based on Custom Receiver:能够使用通过自定义receivers接收的数据流来创建DStream。查看Custom Receiver GuideDStream Akka获取更多细节。
  • Queue of RDDs as a Stream:要使用测试数据测试一个Spark Streaming application,还可以基于一个RDDs队列来创建一个DStream,需要使用streamingContext.queueStream(queueOfRDDs)。每个被推送到队列中的RDD将被按照DStream中的一个数据批次来对待,并且像流一样进行处理。
    对于来自sockets和files的streams的更多细节,请参考相关功能的API文档,对于Scala请参考StreamingContext,对于Java请参考JavaStreamingContext,对于Python请参考StreamingContext

Advanced Sources

Python API 从Spark 2.0.0起,Kafka、Kinesis和Flume在Python的API中都是可用的。
这类数据源要求与外部非Spark库进行接口通信,它们其中一些带有复杂的依赖(如Kafka和Flume)。因此,要减少依赖版本冲突的相关问题,根据这些数据源创建DStream的功能被移动到单独的包中,在需要的时候可以进行明确的link
注意这些高级的数据源在Spark shell中是不支持的,因此基于这些数据源的application不能在shell中测试。如果你真的想要在Spark shell中使用它们,你需要下载对应的Maven依赖JAR,并将其添加到classpath中。
一些高级的数据源如下:

Custom Sources

Python API 在Python中还不支持。
还是可以使用自定义数据源来创建输入DStream。所有你需要做的是实现用户定义的receiver(查看下一节来了解它是什么),它能够从自定义数据源接收数据并推送到Spark中。查看Custom Receiver Guide获取更多细节。

Receiver Reliability

基于数据源的可靠性,有两种类型的数据源。数据源(像Kafka和Flume)允许转入的数据被确认。如果系统从可靠数据源接收数据,它们能够正确的对接收到的数据进行确认,它能确保在任何失败情况下不会有数据的丢失。这导致了两种类型的receivers:

  • Reliable Receiver - 一个可靠的receiver当数据被接收并以备份的方式存储到Spark中时,会正确的发送确认信息给可靠的数据源。
  • Unreliable Receiver - 一个不可靠的receiver不会给数据源发送确认信息。这可以用于那些不支持确认的数据源,或不想进入复杂确认逻辑的可靠数据源。

如何编写一个可靠的receiver的细节我们将在Custom Receiver Guide中讨论。

Transformations on DStream

与RDDs类似,transformation允许来自输入DStream的数据被修改。DStream支持很多在一般的Spark的RDD上可用的transformation。一些常用的如下:

Transformation Meaning
map(func) 通过将源DStream中的每个数据项传递给一个函数func,而返回一个新的DStream
flatMap(func) 与map类似,但是每个输入项可以映射到0个或多个输出项
filter(func) 使用函数func对源DStream中的记录进行筛选,保留返回true的选项,最终返回一个新的DStream
repartition(numPartitions 通过创建更少或更多的paritions,在这个DStream中修改并行的级别
union(otherStream) 返回一个新的DStream,新的DStream包含源DStream和otherDStream联合的所有数据项
count() 通过对源DStream的每个RDD中数据项计数,返回一个单数据项RDDs的新的DStream
reduce(func) 使用一个函数func(它有两个参数,但是只有一个返回值)对源DStream的每个RDD中的数据项进行聚合,返回一个不重复数据项的RDDs的新的DStream
countByValue() 当在数据项类型为K的DStream上调用时,返回一个新的(K, Long)元组类型的DStream,其中value是key在源DStream的每个RDD中的频率
reduceByKey(func, [numTasks] 当在一个(K, V)类型的DStream上调用时,返回一个新的(K, V)类型的DStream,其中的value是使用函数func对每个key进行聚合的值。注意:默认情况,这将使用Spark的默认并行任务数(本地模式为2,在集群模式中,这个数量通过spark.default.parallelism来决定)进行分组。你可以传递一个可选参数numTasks来设置一个不同的任务数
join(otherStream, [numTasks] 当在(K, V)类型DStream和(K, W)类型DStream上调用时,返回一个(K, (V, W))类型的新的DStream。value是每个key所有的数据项的组合
cogroup(otherStream, [numTasks] 当在(K, V)类型DStream和(K, W)类型DStream上调用时,返回一个(K, Seq[V], Seq[W])元组类型的DStream
transform(func) 对源DStream的每个RDD应用RDD-to-RDD的函数来返回一个新的DStream。这可以用来在DStream上做任意的RDD操作
updateStateByKey(func) 返回一个新的”state”DStream,其中的state通过给定函数在每个key在之前state上进行更新得到的新值。这可以用于为每个key保存任意的状态数据

这些transformation中的一些是值得更加详细的讨论的。

UpdateStateByKey Operation

updateStateBykey操作允许你在使用新的信息连续更新它时保存任意的状态信息。要使用这个,你有两个步骤来做:

  • Define the state - 状态可以是任意的数据类型
  • Define the state update function - 指定一个函数来说明如何使用之前的状态和来自输入DStream的值来更新状态信息。

在每个batch中,Spark将为所有存在的key应用状态更新函数,不管这些keys在batch中是否有新的数据。如果更新函数返回None,那么key-value对将会被消除。
我们使用一个例子进行阐明。假设你想要维持一个在文本数据流中遇到的字的统计。这里,状态是计数,是Integer类型。我这样定义更新函数:

1
2
3
4
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}

这在一个包含字的DStream上应用(假设在之前例子中,pairs是包含(words, 1)对的DStream)。

1
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新函数将会为每个字进行调用,newValues是一个元素个数为1的序列(来自(word, 1)对),runningCount是之前的数量。
注意,使用updateStateByKey,要求checkpoint目录已经配置,checkpoint目录我们会在checkpointing章节详细讨论。

Tansform Operation

transform操作(它的变体transformWith也一样)允许任意的RDD-to-RDD函数被应用到一个DStream上。它能够被用于执行那些没有在DStream API中出现的任何RDD操作。例如,将一个数据流中的每个batch与另一个数据集进行join操作的功能没有直接在DStream API中出现。然而你可以使用transform轻松的实现它。它非常强大。例如,通过将预先计算的数据与输入数据流进行join来实时清理数据并基于数据进行过滤。

1
2
3
4
5
6
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})

注意,这个功能将每个batch间隔调用一次。这允许你做根据时间变化的RDD操作,那就是,RDD操作、partitions的数量、广播变量等,能够在batches之间变动。

Window Operations

Spark Streaming还提供了窗口化操作,这允许你在一个数据滑动窗口上应用transformations。下面的图阐述了滑动窗口。
Streaming DStream window
正如在图中显示的,每个时刻窗口滑动一个源DStream,落入窗口的源RDDs被组合和操作生成窗口化DStream的RDDs。在这个特殊的例子中,这个操作被应用到最后三个时间单位的数据上,并且滑动了两个时间单位。这表示,任何窗口操作都需要指定两个参数。

  • window length - 窗口的期间(图中为3)。
  • sliding interval - 窗口操作执行的间隔(图中为2)。
    这两个参数必须是源DStream的batch间隔的倍数(图中为1)。
    我们使用一个例子来阐明窗口操作。假设,你想要扩展之前的例子,每十秒钟生成最后30秒数据的字数统计。要达到这个目的,我们需要在最后30秒数据的(word, 1)类型DStream上应用reduceByKey操作。这通过使用操作reduceByKeyAndWindow来完成。
    1
    2
    // Reduce last 30 seconds of data, every 10 seconds
    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些常用的窗口操作如下。所有的这些操作都需要两个参数- windowLength和slideInterval。

Transformation Meaning
window(windowLength, slideInterval) 返回一个新的DStream,新的DStream是基于窗口化的源DStream的batches计算的
countByWindow(windowLength, slideInterval) 返回一个滑动窗口,记录流中数据项的个数
reduceByWindow(fun, windowLength, slidenInterval) 返回一个不重复数据项stream,通过在一个滑动间隔上使用func函数来聚合stream中的数据项进行创建。这个函数应该被组合或交换,因此能够正确的并行计算
reduceByKeyAndWindow(func, windowLength,
slideInterval, [numTasks])
当在一个(K, V)类型的DStream上调用时,返回一个新的(K, V)类型的DStream,其中value是为每个key使用给定的reduce函数func在一个滑动窗口中batch上聚合的值。注意默认情况下,这将使用Spark的默认并行任务数据(对于本地模式为2,对于集群模式这个数量由配置属性spark.default.parallelism来决定)来进行分组。你可以传递一个可选参数numTasks来设置一个不同数量的任务数
reduceByKeyAndWindow(func, invFunc, windowLength,
slideInterval,[numTasks])
一个比上面reduceByKeyAndWindow()更加有效的版本,其中每个窗口的reduce值使用前一个窗口的reduce值进行递增计算而得到。这是通过对进入滑动窗口的新的数据进行reducing完成的,并且对来开窗口的旧数据进行反转reducing。一个例子可能是对窗口划过的key的数量进行加法和减法。然而它只适用于“反转reduce函数”,就是这些函数要有一个“反转reduce”函数(如参数invFunc)。和reduceKeyAndWindow,reduce任务的数量可以通过一个可选参数来配置。注意,要使用这个操作,checkpointing必须启用。
countByValueAndWindow(windowLength, slideInterval,
[numTasks])
当在一个(K, V)类型的DStream上调用时,返回一个新的(K, Long)类型的DStream,其中value是每个key在滑动窗口内出现的频率。类似reeduceByKeyAndWindow,reduce任务的数量通过可选参数可以配置

Join Operations

最后,值得对如何在Spark Streaming中轻松执行不同类型的joins操作进行重点介绍。

Stream-stream joins

Streams能够很容易的和其他streams进行join。

1
2
3
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

这里,在每个batch间隔中,由stream1生成的RDD将会与由stream2生成的RDD进行join。你还可以做leftOuterJoin、rightOuterJoin、fullOuterJoin。此外,在streams的窗口上做joins常常也是非常有用的。它相当容易,如下。

1
2
3
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins

在解释DStream.transform操作的时候已经对此介绍过了。这里还有另外一个例子,这个例子使用一个dataset与一个窗口化的stream进行join。

1
2
3
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

实际上,你能够动态的变更你想要join的数据集。这个功能供transform来评估每个batch间隔,因此它将使用当前dataset引用的数据集。
DStream的整个transformations列表在API文档中是可用的。对于Scala API,参考DStreamPairDStreamFunctions。对于Java API,查看JavaDStreamJavaPairDStream。对于Python API,查看DStream

Output Operations on DStream

输出操作允许DStream的数据被推送到外部系统,比如一个数据库或一个文件系统。因为输出操作真正的允许被转换的数据被外部系统所消费,它们触发所有DStream transformations(和RDDs的actions类似)真正执行。当前,定义了如下输出操作:

Output Operation Meaning
print() 打印一个DStream中每个batch的前10个数据项到运行streaming application的driver节点。这对于部署和调试是非常有用的。Python API 在Python API需要调用pprint()
saveAsTextFiles(prefix, [suffix]) 以文本文件保存这个DStream的内容。每个batch间隔的文件的名称基于prefix和suffix生成:prefix-TIME_IN_MS[.suffix]
saveAsObjectFiles(refix, [fuffix]) 以序列化的Java对象的序列化文件保存DStream的内容。每个batch间隔文件的名称基于prefix和suffix生成:prefix-TIME_IN_MS[.suffix]。Python API 在Python API中不可用
saveAsHadoopFiles(prefix, [suffix]) 以Hadoop文件保存这个DStream的内容。每个batch间隔的文件名称基于prefix和suffix生成:prefix-TIME_IN_MS[.suffix]。Python API 在Python API中不可用
foreachRDD(func) 最通用的输出操作器,它在由stream生成的每个RDD上应用一个函数func。这个函数应该将每个RDD的数据推送到外部系统,例如将RDD保存为文件,或通过网络将RDD写到数据库。注意,函数func在运行streaming application的driver进程中被执行,并且通常会有RDD actions在driver进程中来触发streaming RDDs的计算

Design Patterns for using foreachRDD

dstream.foreachRDD是一个强大的方法的,它允许数据被发送到外部系统。然而,例如如何正确的、有效的使用这个方法是很重要的。一些常见可避免的错误如下。
通常写数据到外部系统需要创建一个连接对象(例如,TCP连接到远程服务器)并使用连接对象发送数据到远程系统。为了这个目的,开发者可能不经意的尝试在Spark driver上创建一个连接对象,然后尝试在Spark worker中使用它来保存RDDs中的记录。例如(使用Scala语言),

1
2
3
4
5
6
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}

这是不正确的,因为这要求连接对象被序列化,并从driver发送给worker。而这样的连接对象很难跨机器传输。这个错误可能表现为序列化错误(连接对象不能序列化)、初始化错误(连接对象需要在workers上被初始化),等。正确的解决方法是在worker上创建连接对象。

然而,这回导致另一个常见错误 - 为每一条记录创建一个连接。例如:

1
2
3
4
5
6
7
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}

通常,创建一个连接对象需要时间和资源的开销。因此,为每条记录创建和销毁一个连接对象会引发不必要的过高负载并大大的降低系统整体的吞吐量。一个比较好的解决方法是使用rdd.foreachPartition - 创建单个连接,并使用这个连接发送一个RDD partition中的所有记录。

1
2
3
4
5
6
7
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}

这将在很多记录上平摊连接创建的开销。
最后,通过跨多个RDDs/batches重用连接,能够进一步优化。一种方法是维持一个静态连接池,可以被多个要被推送到外部系统的batches的RDDs使用,因此能偶进一步降低负载。

1
2
3
4
5
6
7
8
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}

注意,在连接池中的连接应该是后台进程延迟创建的,并且如果一段时间不用,应该可以超时。这实现了更加有效的发送数据到外部系统。
其它需要记住的:

  • DStream通过输出操作是懒执行的,像RDDs被RDD actions懒执行一样。明确的说,DStream输出操作中的RDD action触发了被接收到的数据的处理。因此,如果你的application没有任何输出操作,或者有dstream.foreachRDD()这样的输出操作,但是其中没有任何RDD action,也是不会有任何东西执行的。系统将会简单接收数据然后丢掉。
  • 默认情况下,输出操作同一时刻只执行一个。它们以它们在application中定义的顺序被执行。

Accumulators and Broadcast Variables

AccumulatorsBroadcast variables在Spark Streaming不能够从checkpint中恢复。如果你启用了checkpoint,同时还使用了Accumulators或Broadcast variables,你将必须为Accumulators和Broadcast variables创建一个懒实例化的单实例,那么它们能够在driver因失败而重启后重新被实例化。下面的例子展示了这个功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})

查看完整的源代码

DataFrame and SQL Operations

你能够在streaming data上轻松的使用DataFrames and SQL。你需要创建使用SparkContext创建一个SparkSession,SparkContext是StreamingContext使用的那个。而且这是必须要做的,因此在driver失败后能够重启。这是通过创建一个懒实例化的SparkSession单实例来完成。这在下面的例子中展示了。它修改了之前word count example,使用DataFrames and SQL来生成字的数量。每个RDD被转变为一个DataFrame,作为一个临时表进行注册,然后使用SQL查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** DataFrame operations inside your streaming program */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}

查看完整源代码
你还能够从不同的线程中(异步运行StreamingContext)对定义在streaming数据上的tables执行SQL查询。只要确保设置StreamingContext能够记住足够的streaming数据,以便查询执行。否则StreamingContext无法察觉异步SQL查询,将会在查询完成之前将旧的streaming数据删除。例如,如果你想查询最后的batch,但是你的查询需要花费5分钟,那么调用streamingContext.remember(Minutes(5))(在Scala,其他语言也一样)。
查看DataFrames and SQL指南,学习关于DataFrames的更多信息。

MLlib Operations

你还可以轻松的使用由MLlib提供的机器学习算法。首先,这些streaming机器学习算法(如 Streaming Linear Regression, Streaming KMeans等)能够从streaming数据学习的同时在这些streaming数据上应用。除了这些,对于一些较大的机器学习算法类,你可以离线训练学习模型(使用历史数据)然后对在线的数据进行应用。查看MLlib指南获取更多细节。

Caching / Persistence

与RDDs相似,DStream允许开发者将stream的数据保存到内存中。在一个DStream上使用persist()方法会自动的将对应DStream的每个RDD保存到内存中。如果DStream中的数据会被计算多次(在相同数据上计算多次),这将是非常有用的。对于基于窗口的操作(像reduceByWindow和reduceByKeyAndWindow)和基于状态的操作(像updateStateByKey),隐含是自动保存的。因此,通过基于窗口的操作生成的DStream将自动保存到内存中,而不需要开发者调用persist()方法。
对于通过网络(如Kafka、Flume、socket等)接收数据的输入stream,为了容灾,将设置默认存储级别,将数据复制到两个节点。
注意,和RDDs不同,DStream的默认存储级别会将数据序列化到内存中。在Performance Tuning章节中会进一步讨论。不同存储级别的更多信息可以在Spark Programming Guide中找到。

Checkpointing

一个streaming application必须是7 * 24小时的运行,因此必须有能力面那些对与application逻辑无关(如系统故障、JVM crash等)的故障的能力。为了达到这个目的,Spark Streaming需要对一个容灾存储系统checkpoint足够的信息,这样它才能从故障中恢复。有两种类型的数据需要被checkpoint。

  • Metadata checkpointing - 保存定义streaming计算的信息到容才存储中(例如hDFS)。这用于从运行streaming application driver的节点的故障中恢复(稍后详细讨论)。元数据包括:
    • Configuration - 用于创建streaming application的配置。
    • DStream operations - 定义streaming application的DStream操作集合。
    • Incomplete batches - 放到队列但还没有完成的job所属Batches。
  • Data checkpointing - 保存生成的RDDs到可靠存储。在一些跨多个batches组合数据的状态化transformations中,这是必须的。在这样的transformations中,生成的RDDs依赖于RDDs之前的batches,这会引发依赖链的长度随着时间而增长。要避免在恢复时刻无尽的增长(与时间成正比),状态化的transformations的中间RDDs周期性的被checkpoint到可靠存储中(如HDFS)来截断依赖链。

作为总结,元数据checkpointing对于从driver故障中恢复是主要需要的,然而数据或RDD checkpoing对于基本功能是必须的,如果状态化transformations被使用。

When to enable Checkpointing

有任何如下需求的application,都需要启用checkpointing:

  • Useage of stateful transformation - 如果updateStateByKey或reduceByKeyAndWindow在application中被使用,那么checkpoint目录必须被设置来允许周期性的RDD checkpint。
  • Recovering from failures of driver running the application - 元数据被用于恢复进程信息。
    注意,那些没有没有使用之前提到的状态化transformation的简单的streaming application能够在没有启用checkpointing情况下运行。在这个例子中从driver故障中恢复也是局部的(一些已经接收到但未处理的数据会丢失)。这通常是可以接收的,并且很多Spark Streaming application也是以这种方式运行的。对于非Hadoop环境的支持,期望未来能够支持。

    How to configure Checkpointing

    通过在一个容灾的、可靠的文件系统中(如HDFS、S3等)设置一个目录来启用Checkpinting,这个目录将用于checkpoint信息的保存。这是通过使用streamingContext.checkpoint(checkpointDirectory)来完成的。这将允许你使用之前说的状态化transformation。另外,如果你想要从driver故障中恢复application信息,你应该重写你的streaming application,使其拥有如下行为:
  • 当你的程序是第一次启动时,它会创建一个新的StreamingContext,装配所有的streams,然后调用start()方法。
  • 当你的程序是在driver故障后被重启,它应该根据checkpoint目录中的checkpoint数据重新创建一个StreamingContext。

这些行为可以通过使用StreamingContext.getOrCreate来轻松完成。如下使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()

如果checkpointDirectory目录存在,那么将根据checkpoint数据重新创建context。如果目录不存在(例如第一次运行),那么将会调用functionToCreateContext函数来创建一个新的context并建立DStream。查看Scala的例子RecoverableNetworkWordCount。这个例子将网络数据的字数统计添加到一个文件中。
另外,要使用getOrCreate,还需要确保driver进程在遇到故障后自动重启。这可以通过运行application的部署架构来完成。这在Deployment章节中进一步讨论。

Deploying Applications

这一章我们讨论部署一个Spark Streaming application的步骤。

Requirements

要运行一个Spark Streaming application,你有如下事情要做。

  • cluster with cluster manager - 这是任何Spark application的普遍要求。并且已经在deployment guide中讨论了。
  • package the application JAR - 你需要将你的streaming application编译到一个JAR中。如果你使用spark-submit来启动application,那么你不需要在JAR中提供Spark和Spark Streaming。然而,如果你的application使用了advanced sources(例如Kafka,Flume),那么你需要将链接到的额外坐标进行打包,将application的依赖打入用于部署application的JAR。例如,一个application使用了KafkaUtils,将需要包含spark-streaming-kafka-0_8.2.11以及所有它涉及的依赖到这个application的JAR中。
  • configuring sufficient memory for the executors - 因为接收到的数据必须保存到内存中,因此executors必须配置足够的内存来保存接收到的数据。注意,如果你要做10分钟的窗口操作,那么系统必须至少保存最后10分钟的数据在内存中。因此,application需要的内存依赖于application中使用的操作。
  • configuring checkpointing - 如果stream application需要它,那么在符合Hadoop API的容灾存储中(HDFS、S3等)必须配置一个目录作为checkpoint目录,streaming application以一种方式写入checkpoint信息,这些信息可以用于恢复故障。查看Checkpointing章节获取更多信息。
  • configuring automatic restart of the application driver - 要自动的从一个driver故障中恢复,那么用来运行streaming application的部署基础架构必须进行监控并在它故障的时候重启动。不同的cluster managers有不同的工具来完成。
    • Spark Standalone - 一个Spark application driver能够被提交并运行在Spark Standalone cluster中(查看cluster deploy mode),那就是application driver自己运行在worker节点中的一个上。此外,Standalone集群管理器被引入来监督driver,如果driver以非0代码退出而失败时或在运行driver的节点故障时重启driver。在Spark Standalone guide中查看集群模式和集群监督来获取更多信息。
    • YARN - Yarn对自动重启一个application的支持方式类似。请参考YARN文档获取更多信息。
    • Mesos - Marathon被用来在Mesos中解决自动重启。
  • configuring write ahead logs - 从Spark1.2开始,为了实现强壮的容灾保证,我们引入了write ahead logs。如果启用,所有从receiver接收到的数据写入到ahead日志中,ahead日志位于配置的checkpoint目录中。这防止在driver恢复时数据丢失,因此能够确保零数据丢失(将在Fault-tolerance Semantics章节中讨论)。通过设置配置参数spark.streaming.receiver.writeAheadLog.enable为true来启用该功能。然而这个强壮的语义可能会带来个别receivers接收吞吐量的代价。通过执行more receivers in parallel来增加平均吞吐量来得到修正。另外,推荐当写ahead日志被启用时,禁用Spark中接收到数据的响应,因为日志已经存储到了可靠的存储系统中。这可以通过设置输入stream的存储级别为StorageLevel.MEMORY_AND_DISK_SER来完成。然而当为ahead日志使用S3(或任何不支持flush的文件系统)时,请记住启用spark.streaming.dirver.writeAheadLog.closeFileAfterWrite和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。查看Spark Streaming Configuration获取更多信息。
  • setting the max receiving rate - 如果集群数据源不足够大,以便让streaming application的数据处理速度跟数据被接收的速度一样快,这些receiver能够通过设置一个最大速度来进行限制,限制每秒的记录数。查看configuration parametersspark.streaming.receiver.maxRate(为receivers)和spark.streaming.kafka.maxRatePerPartition(为直接使用Kafka的方法)。在Spark 1.5中,我们还引入了一个特征叫做backpressure,它淘汰了设置速度限制的需要,它能够自动指出速度限制值并如果处理的条件变化了能够自动调整。通过配置参数spark.streaming.backpressure.enabled为true来启用backpressure。

Upgrading Application Code

如果一个运行中的Spark Streaming application需要使用新的代码来更新,有两种可行的机制。

  • 更新的Spark Streaming application被启动并且和为更新的application并行运行。一旦新的application(和旧的application一样接收相同的数据)已经热启动,就已经准备就绪了,那么旧的application就可以关闭了。注意,这能够用于那些数据源支持将数据发送到两个目的地(之前的application和更新后的application)的数据源,而且是依赖于这种数据源来完成的。
  • 将正在运行的application优雅的关闭(查看StreamingContext.stop(…)JavaStreamingContext.stop(…)获取优雅关闭选项),这样能够确保在关闭之前所有接收到的数据都得到处理。然后可以启动更新的application,这样能够在之前application中止的位置开始处理。注意,这种处理方式只能对输入数据源支持数据源端支持缓存(如Kafka和Flume)的有效,因为数据需要在之前application关闭且更新的application还未启动期间进行缓存。根据之前未更新代码的checkpoint信息重启不能够成功。checkpoint信息本质上包含了序列化的Scala/Java/Python对象,尝试使用新的修改后的类来反序列化可能会导致错误。在这种情况中,使用一个不同的checkpoint目录来启动新的application,或将之前的checkpoint目录删除掉。

Monitoring Applications

除了Spark的monitoring capabilities,针对Spark Streaming还有一些附加的能力。当使用一个StreamingContext时,Spark web UI额外展示一个streaming的tab页,用来展示正在运行的receivers(receivers是否活跃,接收到的记录数,receiver的错误等)和完成的batches(batch处理时间,队列延迟等)的统计信息。这能够用于监控streaming application的进程。
web UI中的如下两个metrics是相当重要的:

  • Processing Time - 处理每个数据batch的时间。
  • Scheduling Delay - 一个batch在队列中等待前面batches处理完成所需时间。
    如果batch处理时间一直多于batch间隔,并且队列排队延迟保持增长,那么它表示系统对batch的处理速度跟不上batch的生成速度并且会落后。在这种情况中,需要考虑降低batch的处理时间。
    使用StreamingListener接口,一个Spark Streaming程序的进程也能够被监控,这个接口使你能够得到receiver状态和处理时间。注意,这是一个开发者API,之后很有可能会在此基础上进行优化(报告更多的信息)。

Performance Tuning

获得一个Spark Streaming application在集群上的最好性能需要一些调整。这一章会解释一定数量的参数和配置,用来调整以提高你的application的性能。在高级别,你需要考虑两个事情:
1、通过有效的使用集群资源来降低数据每个batch的处理时间。
2、设置正确的batch大小,数据的batch设置的大小以它们的处理速度和它们的接收速度一样快(数据的处理跟得上数据的接收)。

Reducing the Batch Processing Times

这里有一些优化可以在Spark中完成以减少每个batch的处理时间。这些已经在Tuning Guide中讨论过了。这一章我们聚焦一些更加重要的。

Level of Parallelism in Data Receiving

跨网络接收数据(如Kafka、Flume、socket等)要求数据被解序列化并存储在Spark中。如果数据接收成为系统的瓶颈,那么需要考虑并行接收数据。注意,每个输入DStream创建单个receiver(运行在一个worker机器上),它接收单个数据流。通过创建多个输入流并配置它们接收来自数据源的数据流的不同partitions来实现接收多个数据流。例如,单个Kafka输入DStream接收数据的两个topic,能够拆分到两个Kafka输入流,每个输入流接收一个topic。这需要运行两个receivers,允许数据并行接收,因此增加了整体的吞吐量。这多个DStreams能够联合在一起来创建单个DStream。那么用于单个输入DStream的transformation能够被应用到统一的stream上。这如下完成。

1
2
3
4
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

另外一个需要考虑的参数是receiver的阻塞间隔,由配置参数spark.streaming.blockInterval所决定。对于大多数的receiver,接收到的数据在存储到Spark的内存中之前会合并到数据块中。每个batch中的数据块的数量由tasks的数量决定,这些tasks用于处理(类似map这样的transformation)接收到的数据。每个batch每个receiver的任务数是近似的(batch interval / block interval)。例如,200毫秒的block间隔,每个2秒的batches将创建10个tasks。如果tasks的数量太低(假如,比每个机器的core的数量少),那么它将是低效的,因为所有可用的core没有全部用于处理数据。要为给定的batch间隔增加tasks的数量,可以降低block间隔。然而,推荐的最小block间隔值为50毫秒,低于这个值,task的启动负载将是一个问题。
以多输入流/recievers来接收数据的另一个选择是复制一定的输入数据流(使用inputStream.repartition())。这会在进一步处理数据之前在集群中跨指定数量的机器上分发接收到的数据batches。

Level of Parallelism in Data Processing

在计算的任何阶段,如果并行的tasks的数量不是足够高的,那么集群的资源利用率很低。例如,对于分布式的reduce操作(像reduceByKey和reduceByKeyAndWindow),默认的并行任务数有spark.default.parallelism配置属性控制。你能够传递一个并行级别作为参数(查看PairDStreamFunctions文档)或设置spark.default.parallelism配置参数来修改默认值。

Data Serialization

通过调整序列化格式能够降低数据序列化的负载。在streaming的情况中,有两种类型的数据被序列化。

  • input data:默认,通过Receivers接收到的输入数据以StorageLevel.MEMORY_AND_DISK_SER_2存储到executor的内存中。那样,数据被序列化到字节中以降低GC开销,并且为了对executor进行容灾而备份存储。同样的,数据首先保存在内存中,只有当内存没有足够的空间为streaming计算保存所有的输入数据时才会将数据溢出到磁盘。序列化显然会有负载 - receiver必须对接收到的数据进行解序列化,然后使用Spark的序列化格式重新序列化。
  • Persisted RDDs generated by Streaming Operations:通过streaming计算生成的RDDs可能会保存到内存中。例如,窗口操作保存数据到内存中,因为这些数据将被处理多次。然而,和Spark Core的默认级别StorageLevel.MEMORY_ONLY不同,有streaming计算生成的RDDs默认以StorageLevel.MEMORY_ONLY_SER进行保存,以减少GC负载。

在上面两种情况中,使用Kryo序列化能够降低CPU和内存的开销。查看Spark Tuning Guide获取更多详情。对于Kryo,考虑到注册自定义类禁用对象引用跟踪(在Configuration Guide中查看Kryo-related配置)。
在那些需要为streaming application保存的数量不大的特殊情况中,以非序列化对象来保存数据可能可以,而且不会导致过多的GC负载。例如,如果你是用了几秒的batch间隔并且没有窗口操作,那么你能够通过明确的设置相应的存储级别来禁用序列化保存数据。这将会减少在序列化期间的cpu开销,没有太多的GC开销则潜在的提高了执行性能。

Task Launching Overheads

如果每秒启动的tasks数很高(假设每秒50个或更多),那么发送tasks给slave的负载可能是很显著的,并且使它很难实现秒内的延迟。这个负载能够通过如下的变更来降低:

这些变更可能会降低batch处理时间(毫秒的百倍),因此允许秒内的batch大小。

Setting the Right Batch Interval

为了让运行在集群上的Streaming application稳定,系统处理数据的速度应该和接收数据的速度一样快。换句话说,数据batches的处理速度应该和它们的生成速度一样快。对于一个application是否是这样,可以在streaming web UI中通过monitoring处理时间来找到,其中batch处理时间应该小于batch间隔时间。
依赖于streaming计算的本质,使用的batch间隔可能在数据速率上有重大的影响,它能够被application在一组固定的集群数据源上所维持。例如,我们考虑一下之前WordCountNetwork的例子。对于一个特定的数据速率,系统可能能够维持每两秒报告一次字数的统计(batch间隔为2秒),而每500毫秒却不行。因此batch间隔需要被设置符合期望的数据生成速率以便持续。
一个为你的application指出正确batch的size的好方法是使用一个保守的batch间隔(假设5-10秒)和一个低的数据速率来测试。要核实系统是否能够跟得上数据的速度,你能够通过每个被处理的batch(在Spark driver log4j的日志中查找”total delay”或使用StreamingListener接口)检查端对端延迟的值。如果这个延迟能够比得上batch的size,那么这个系统是稳定的。否则,如果这个延迟连续增长,这意味着系统不能够跟得上,因此是不稳定的。一旦你有了一个稳定的配置,你能够尝试增加数据速度或降低batch的size。注意临时数据速率增长期间的延迟短暂增长是没有问题的,只要延迟能够降低回一个低的值(小于batch的size)。

Memory Tuning

调整Spark内存的使用和Spark GC行为已经在Tuning Guide中已经讨论的非常详细了。强烈推荐你读它。在本章中,我们讨论一些针对Spark Streaming applications的context中的参数。
一个Spark Streaming application需要的集群内存总数依赖于使用的transformation。例如,如果你想要在最后十分中的数据上使用一个窗口操作,那么你的集群应该有足够的内存来保存10分中的数据。或者,如果你想要对很大数量的key使用updateStateByKey,那么需要的内存会很高。相反的,如果你想要做一个简单的map-filter-store操作,那么需要的内存将非常低。
通常,因为通过receivers接收到的数据以StorageLevel.MEMORY_AND_DISK_SER_2级别进行存储,那些无法装配到内存中的数据将被溢出到磁盘。这可能会降低streaming application的性能,因此建议提供足够的内存供你的streaming application使用。最好尝试并查看一个小Scala代码的内存使用。
另一方面的内存调整是垃圾收集。对于一个streaming application,要求是低延迟,有JVM垃圾回收引起的较大暂停是不好的。
有一些参数能够帮助你调整内存的使用和GC负载:

  • Persistence Level of DStreams: 正如之前在Data Serialization,输入数据和RDDs默认以序列化字节保存。相比非序列化保存,这降低了内存的使用和GC的开销。启用Kryo序列化将进一步降低序列化的大小和内存的使用。进一步降低内存的使用可以通过压缩来完成(查看spark配置spark.rdd.compress),但是会耗费CPU时间。
  • Clearing old data: 默认,所有的输入数据和通过DStream transformations生成的被保存的RDDs会被自动清理。Spark决定合适清理这些数据是基于使用的transformation的。例如,如果你使用10分钟的窗口操作,那么Spark Streaming将要保存最后10分钟的数据,并且积极的丢弃较老的数据。通过设置streamingContext.remember,数据能够被保留一个较长的区间(例如查询较老的数据)。
  • CMS Garbage Collector: 强烈推荐使用并发mark-and-sweep GC来保持GC处于短暂停状态。通过并发GC来降低系统的整体处理吞吐量,推荐使用它还是为了得到始终如一的batch处理时间。确保在driver(在spark-submit中使用–driver-java-options)上和executor(使用Spark配置spark.executor.extraJavaOptions)上设置了CMS GC。
  • Other tips: 要进一步降低GC开销,还有一些其他东西值得尝试。
    • 使用OFF_HEAP存储级别存储RDDs。在Spark Programming Guide中查看详细信息。
    • 使用更多带有更小heap大小的executors。这样能够降低每个JVM heap的CG暂停。

需要记住的几点:

  • 一个DStream与单个receiver关联。为了实现并行读取并行的多个receiver,需要创建多个DStream。一个receiver在一个executor内运行。它占用一个core。确保在receiver订阅一个core之后有足够的的core来进行处理。spark.cores.max会将receiver的订阅算到账户中。receivers以轮转的模式分配给executors。
  • 当数据从一个stream数据源被接收,receiver创建数据的blocks。每个block间隔毫秒创建一个新的数据block。batch间隔期间创建N个数据block,其中N = batchInterval/blockInterval。这些block通过当前executor的BlockManager分布到其他executors的block managers。之后,运行在driver上的网络输入跟踪器被通知得到这些block的位置,用于做进一步的处理。
  • 在driver上为batch间隔期间创建的blocks创建一个RDD。batch间隔期间生成的blocks是这个RDD的partitions。在Spark每个partition是一个任务。blockInterval==batchinterval意味着是单个partition并且可能是本地处理的。
  • blocks上的map任务在executors(一个是接收block的那个,另一个位于block的副本处)中被处理,executor有的blocks和block间隔无关,除非是非本地调度。拥有更大block间隔意味着更大的blocks。一个高的spark.locality.wait值增加了本地节点上一个block的处理机会。需要找出这两个参数之间的一个平衡以确保较大blocks能够本地被处理。
  • 除了依靠batchInterval和blockInterval,通过调用inputDstream.repartitions(n)你还能够定义partitions的数量。这随机的混洗RDD中的数据来创建n个partitions。是的,更好的并发。虽然带来了混洗的开销。一个RDD的处理通过driver的job调度器作为一个job来调度。在给定的时间点,只有一个job活跃。因此,如果一个job正在执行,那么其他的job将会在队列中排队。
  • 如果你有两个dstream,那么将会有形成两个RDDs,并且将会创建两个jobs,这两个job会一个接一个进行调度。要避免这样,你能够联合两个dstreams。这将确保单个联合的RDD是由两个dstreams的RDDs组成的。联合的RDD将被作为单个job来考虑。然而这个RDD的partition是没有影响的。
  • 如果batch处理时间比batchinterval要多,那么很明显receiver的内存将开始堆积最后会抛出异常(很有可能是BlockNotFoundException)。当前没有办法暂停receiver。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制receiver的速度。

Fault-tolerance Semantics

在本章,我们将讨论Spark Streaming applications在故障事件中的行为。

Background

要理解由Spark Streaming提供的概念,让我们记住Spark的RDDs的容灾基础。
1、一个RDD是一个不可变的、可重新计算的分布式数据集。每个RDD记住确定的操作体系,这个操作体系被用于容灾的输入数据集上来创建RDD。
2、如果一个RDD的任何partition在worker节点故障期间丢失,那么那个partition将会根据原始的容灾数据集使用操作体系重新计算出来。
3、假设所有的RDD transformation都是确定的,那么在最终转换的RDD中的数据也将总是与Spark集群中的故障无关的。

Spark在容灾文件系统(例如HDFS或S3)中的数据上操作,因此所有从容灾数据上生成的RDDs也是容灾的。然而这不适合Spark Streaming,因为在多数情况下数据是通过网络接收到的(除了使用fileStream时)。要为所有生成的RDDs达到相同的容灾属性,被接收到的数据被复制到集群中的worker节点中多个executor中(默认的重复因子为2)。这使得在故障时系统中的两种数据需要被恢复:
1、Data received and replicated - 这种数据能够从单个worker节点故障中幸存,因为这个数据的一个拷贝在另外一个节点上。
2、Data received but buffered for replication - 因为这个数据没有重复存放,恢复这个数据的唯一方式是从数据源获取。
此外,有两种类型故障我们应该关心:
1、Failure of a Worker Node - 任何运行executors的worker节点都可能发生故障,并且所有在内存中的数据会丢失。如果有运行在故障节点的receivers,那么它们缓存的数据将会丢失。
2、Failure of the Driver Node - 如果运行Spark Streaming application的driver节点故障,显然SparkContext也被丢失,并且所有executors和他们内存中的数据也会丢失。

有了这些基础知识,我们开始理解Spark Streaming的容灾概念。

Definitions

常常听到的一个streaming系统的语义是每条记录能够被系统处理多少次。有三种类型的保证,由底层可能的操作条件所提供.
1、最多一次:每条记录被处理一次或不被处理。
2、至少一次:每条记录会被处理一次或多次。这中要比最多一次的健壮,因为它保证数据不会丢失。但是可能会重复。
3、只有一次:每条记录明确只处理一次 - 没有数据丢失也没有数据会被处理多次。很明显这是三种保证中最健壮的。

Basic Semantics

在任何stream处理系统中,在数据处理中大概有三个步骤。
1、Receiving the data:使用Receiver或其他的从数据源接收数据。
2、Transforming the data:接收到的数据使用DStream transformation和RDD transformations被转换。
3、pushing out the data:将转换的最终数据推送到外部系统,像文件系统、数据库、dashboards等。
如果一个streaming application需要完成端对端的只有一次的保证,那么每个步骤需要提供一个只有一次的保证。那就是每条记录必须只被接收一次、转换一次并且推送到下游系统一次。我们理解以下在Spark Streaming的context中的这些语义。
1、Receiving the data: 不同的输入数据源提供了不同的保证。这将在下一章中详细讨论。
2、Transforming the data: 所有已经被接收到的数据将只被处理一次,这个保证由RDDs提供。如果有失败,只要接收到的输入数据可以访问,最终转换成的RDDs将总是包含相同的内容。
3、Pushing out the data: 默认输出操作确保为至少一次,因为它依赖于输出操作的类型(是否幂等)和下游系统的语义(是否支持事务)。但是用户能够定义他们自己事务机制来达到只有一次的语义。这将在本章的之后详细讨论。

Semantics of Receivecd Data

不同的输入数据源提供了不同的保证,涵盖了从至少一次到只有一次的所有情况。更加详细的阅读。

With Files

如果所有输入数据都已经位于一个容灾的文件系统中,如HDFS,Spark Streaming能够总是从任何故障中恢复并处理所有的数据。这给了一个只有一次的语义,意味着所有的数据不管有什么失败,只会被处理一次。

With Receiver-based Sources

对于基于receiver的输入数据源,容错依赖于故障情节和redeiver的类型。正如我们之前讨论的,有两种类型的receiver:
1、Reliable Receiver - 这些receiver之后在将数据重复存储后才会给可靠数据源发送确认信息。如果一个这样的receiver失败,这个数据源将不会收到缓存数据的确认。因此,如果receiver被重启,数据源将会重新发送数据,因此不会有数据在故障期间丢失。
2、Unreliable Receiver - 这类receiver不会发送确认,因此当时worker故障或driver故障期间会丢失数据。
根据使用的什么类型的receivers,我们达到如下语义。如果一个worker节点失败,那么使用可靠receiver不会有数据丢失;使用不可靠receiver,也接受但还未复制保存的数据会丢失。如果是driver节点失败,那么除了这些会丢失,所有接收和复制保存的数据也会丢失。这将会影响状态化装换的结果。
要避免已经接收到的数据的丢失,Spark 1.2引入了write ahead log,它保存接收到的数据到容灾存储中。启用write ahead logs配合可靠receiver,可以零数据丢失。在这些语义中,提供至少一次的保障。
下面的表格汇总了故障的含义:

Deployment Scenario Worker Failure Driver Failure
spark 1.1或更早,或没有开启写ahead日志Spark 1.2或更高版本 使用不可靠receiver将会丢失缓存的数据。使用可靠receiver将零数据丢失,至少一次的语义 使用不可靠receivers缓存的数据丢失,任何类型的receiver,已经传递的数据会丢失,未定义的语义
开启了写ahead日志的Spark 1.2或更高版本 使用可靠receiver将零数据丢失,至少一次的语义 使用可靠receivers和文件将零数据丢失,至少一次的语义

With Kafka Direct API

在Spark 1.3版本中,我们引入了一个新的Kafka直接API,它能够确保所有由Spark Streaming接收的Kafka数据只会接收一次。如果你实现了只有一次的输出操作,你可以达到端对端只有一次的保证。这个方法()将在Kafka Integration Guide中进一步讨论。

Semantics of output operations

输出操作(像foreachRDD)有至少一次的语义,那就是被转换的数据在worker故障事件中可能会写出多次。对于保存到文件(使用saveAs*Files)是可以接受的(因为文件对于相同的数据会简单的覆盖),另外可能必须要达到只有一次的语义。有两种方法。

  • Idempotent updates: 尝试多次写相同的数据。例如,saveAs*Files总是写习惯你同的数据来生成文件。
  • Transactional updates: 所有的更新都会产生事务,因此更新只会确切的做一次。唯一这样做的方式如下。
    • 使用batch时间(在foreachRDD中可用)和RDD的partition索引来创建一个唯一标识。这个标识唯一表示streaming application的一条数据。
    • 使用这个唯一标识以事务方式(确切一次,自动的)更新外部系统。如果这个标识还没有提交,提交这个partition的数据和这个标识。否则,如果已经提交,跳过这个更新。
      1
      2
      3
      4
      5
      6
      7
      dstream.foreachRDD { (rdd, time) =>
      rdd.foreachPartition { partitionIterator =>
      val partitionId = TaskContext.get.partitionId()
      val uniqueId = generateUniqueId(time.milliseconds, partitionId)
      // use this uniqueId to transactionally commit the data in partitionIterator
      }
      }

Migration Guide from 0.9.1 or below to 1.x

在Spark 0.9.1和Spark 1.0之间有一些API变更,以确保之后API稳定性。这一章详细的介绍将已存在的代码合并到1.0的步骤。
Input DStreams:所有在一个输入流中创建的操作(例如,StreamingContext.socketStream、FlumeUtils.createStream等),对于Scala,则返回InputDStream或ReceiverInputDStream(代替了DStream),对于Java,则返回JavaInputDStream、JavaPairInputDStream、JavaReceiverInputDStream或JavaPairReceiverInputDStream(代替JavaDStream)。这确保那些特定于输入流的功能在未来能够被添加到这些类,而不需要打破两者的兼容性。注意,你的已存在的Spark Streaming application应该不需要任何改变(因为这些新类是DStream或JavaDStream的子类)处理可能需要使用Spark 1.0进行编译。
Custom Network Receivers:因为要发布Spark Streaming,使用类NetworkReciever,可以在Scala中定义自定义网络receivers。然而这个API在错误处理和reporting中受到限制,并且不能在Java中使用。从Spark 1.0开始,这个类被Receiver所替换,这个Receiver有如下优势。

  • 添加了像stop和restart方法,以便更好的控制一个receiver的生命周期。查看custom receiver guide获取更多的细节。
  • 自定义receiver能够使用Scala和Java来实现。

要将你的自定义receiver从之前的NetworkReceiver合并到最新的Receiver,你需要做如下的事情。

  • 确保你的自定义receiver类继承了org.apache.spark.streaming.receiver.Receiver而不是org.apache.spark.streaming.dstream.NetworkReceiver。
  • 之前,一个BlockGenerator对象需要通过自定义receiver来创建,接收到的数据会被添加到BlockGenerator来存储到Spark中。它需要明确的在onStart()和onStop()方法中启动和停止。新的Receiver类使这成为非必须的,因为它增加了一组名为store()的方法,可以调用这些方法来将数据存储到Spark中。因此,要合并你的网络receiver,移除任何的BlockGenerator对象(在Spark 1.0中不再存在)并在接收到的数据上使用store(…)方法。

Actor-based Receivers:基于actor的Receiver APIs已经被移动到DStream Akka。请参考这个项目获取更多细节。

Where to Go from Here