Spark Streaming + Kafka Integration Guide

本文是Spark Streaming + Kafka Integration Guide文档的翻译,原文请参考。另外,本文主要用于个人学习使用。

Spark Streaming + Kafka Integration Guide

Apache Kafka是一个发布-订阅的消息队列,作为一个分布式的、分片的、副本提交的日志服务。这里解释如何配置Spark Streaming来从Kafka接收数据。有两种方法来达到这个目的 - 老的方法是使用Receiver和Kafka的高级API,还有一个新的实验性解决方法(在Spark 1.3版本中引入),不需要使用Receiver。它们有不同的编程模式、执行特征和语义保证,因此请仔细阅读。

Approach 1: Receiver-based Approach

这个方法使用一个receiver来接收数据。这个Receiver使用Kafka高级别consumer API来实现。和所有receivers一样,通过一个Receiver从Kafka接收到的数据被存储到Spark executors中,然后Spark Streaming启动jobs来处理数据。
然而,根据默认配置,这个解决方法会因为故障而丢失数据(查看receiver reliabillity。要确保零数据丢失,你需要额外的在Spark Streaming中启用Write Ahead Logs(从Spark 1.2版本中引入)。这将同步的将从Kafka接收到的数据到以写ahead日志的方式波存到分布式文件系统中(如HDFS),因此所有的数据能够从故障中恢复。查看Streaming programming guide中Deploying section获取关于Write Ahead Logs的详细信息)。
接下来,我们讨论如何在你的streaming application中使用这个方法。

1、Linking: 对于使用SBT或Maven项目描述的Scala或Java application,使用如下的坐标链接你的streaming application(查看programming guide中的[Linking section]获取更多信息)。

1
2
3
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.0.0

对于Python application,你需要在部署你的application时添加上面的库以及它的依赖。查看Deploying分项的内容。
2、Programming: 在streaming application代码中,导入KafkaUtils并创建一个输入DStream,如下:

1
2
3
4
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

你还可以指定key和value的类以及它们使用createStream的变体的相关解码类。查看API docsexample

需要注意的几点:

  • Kafka中topic的partitions与Spark Streaming中RDDs生成的partitions没有关联。因此在KafkaUtils.createStream()中增加特定于topic的partition的数量只是增加使用的线程的数量,使用这些线程在单个receiver中对topic进行消费。它不会增加Spark数据的并发处理。参考主要文档获取它的更多信息。
  • 多个Kafka输入DStream能够使用不同的group和topics来创建,以便使用多个receiver来并行接收数据。
  • 如果你使用一个可靠的文件系统(像HDFS)启用了Write Ahead logs,接收到的数据已经被复制到日志中。因此,对于输入流存储的存储级别为StorageLevel.MEMORY_AND_DISK_SER(那就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER))。

3、Deploying: 对于任何Spark application,spark-submit被使用来启动你的application。然而,对于Scala/Java application和Python application之间有轻微的不同。
对于Scala和Java application,如果你使用SBT或Maven作为项目管理,那么打包spark-streaming-kafka-0-8_2.11和它的依赖到application的JAR中。确保spark-core_2.11和spark-streaming_2.11作为依赖进行标记,因为他们已经安装到Spark中了。然后,使用spark-submit来启动你的application(查看主要编程指南中的Deploying section)。
对于Python application,它缺少SBT/Maven项目管理,spark-streaming-kafka-0-8_2.11和它的依赖可以使用–packages直接添加到spark-submit(查看Application Submission Guide)。这样:

1
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 ...

或者,你能够从Maven repository下载Maven坐标的JAR并使用–jars将其添加到spark-submit。

Approach 2: Direct Approach (No Receivers)

这个新的五receiver的直接解决方法已经在Spark 1.3版本中引入,来确保健壮的端对端的保证。代替receivers来接收数据,这个方法周期性的查询Kafka来获取每个topic + partition中最后的offset,然后相应的定义每个batch中处理offset的范围。当处理数据的job启动时,使用Kafka的简单API从Kafka中读取定义的offset范围(类似从文件系统中读取文件)。注意这是一个在Spark 1.3中引入的实验性特征,用于Scala和Java API,在Spark 1.4中才有了Python的API。
这个解决方案在基于receiver的解决方案上有如下优势:

  • Simplified Parallelism: 不需要创建多个输入Kafka streams以及联合他们。使用directStream,Spark Streaming将会创建和要消费的Kafka partitions数量相同的RDD partitions,这样将病习惯你的从Kafka读取数据。因此在Kafka和RDD partitions之间有一个一对一的映射,这样更加容易理解和调整。
  • Efficiency: 在第一个解决方案中要实现零数据丢失需要将数据存储到Write Ahead Log中,这种方式是进一步的复制数据。这实际上是效率低的,因为数据实际上复制了两次-一次被Kafka,另一次被Write Ahead Log。第二种解决方案消除了这个问题,因为没有了receiver,因此不需要Write Ahead Log。只要你有足够的Kafka保留时间,message能够从Kafka中恢复。
  • Exactly-once semantics: 第一种解决方案使用Kafka的高级API将消费掉的offset存储到Zookeeper中。这是从Kafka消费数据的传统方式。而这种解决方案(和write ahead log混合)能够保证零数据丢失(例如至少一次的语义),在一些故障下,有很小的可能性是一些数据会消费两次。这种存在是因为由Spark Streaming可靠的接收的数据和由Zookeeper跟踪的offsets之间的矛盾造成的。因此,在第二解决方案中,我们使用了简单的Kafka API,简单的API没有使用Zookeeper。通过Spark Streaming中它的checkpoints来跟踪offset。这消除了Spark Streaming和Zookeeper/Kafka之间的差异,因此尽管有故障,但是被Spark Streaming接收到的记录实际上只有一次。为了达到输出你的结果只有一次的语义,你的输出操作将数据保存到外部存储中必须是幂等或原子事务的,这样来保存结果和offset(查看主编程指南中Semantics of output operations获取更多信息)。

注意,这种解决方案中一个不利条件是不会在Zookeeper中更新offset,因此那些基于Zookeeper的监控工具将不会更新进度。然而,你能够在这种解决方案中访问每个batch中处理过的offsets并自己更新到Zookeeper(参考下面)。
接下来,我们讨论如何在你的application中使用这种解决方案。

  • Linking: 这种解决方案在Scala和Java application中支持。使用如下的坐标来连接你的SBT/Maven项目(查看主要编程指南中Linking section获取更多信息)。

    1
    2
    3
    groupId = org.apache.spark
    artifactId = spark-streaming-kafka-0-8_2.11
    version = 2.0.0
  • Programming: 在Streaming application代码中,引入KafkaUtils并如下创建一个输入DStream。

    1
    2
    3
    4
    5
    import org.apache.spark.streaming.kafka._
    val directKafkaStream = KafkaUtils.createDirectStream[
    [key class], [value class], [key decoder class], [value decoder class] ](
    streamingContext, [map of Kafka parameters], [set of topics to consume])

你还可以传递一个messageHandlercreateDirectStream来访问MessageAndMetadata,MessageAndMetadata包含了关于当前message的元数据和要将它转换成的目标类型。查看API docsexamples
在Kafka的参数中,你必须指定metadata.broker.list或bootstrap.servers。默认,它将从每个Kafka partition的最后的offset处开始消费。如果你在Kafka参数中设置auto.offset.reset为smallest,那么它将从最小的offset处开始消费。
使用KafkaUtils.createDirectStream的其他变量,你还能够从任意offset处开始消费。此外,如果你想要访问每个batch中已经消费的Kafka offset,你可以如下这么做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}

如果你想要基于Zookeeper的Kafka监控工具来展示streaming application的进度,你还可以使用这个来更新Zookeeper。
注意到HasOffsetRanges的类型转换,将只有它在第一个方法在directKafkaStream上调用完成后才会成功,不会晚于一个方法链(不知道这句是否正确)。你能够使用transform()方法来代替foreachRDD()方法来作为你调用的第一个方法以便访问offsets,然后调用进一步的Spark方法。然而,需要注意的是在任何shuffle或repartition方法之后,RDD partition和Kafka partition之间的一对一映射将不再维持,例如reduceByKey()方法或window()方法。
另一个需要注意的是,因为这个解决方案没有使用receiver,标准的receiver-related(spark.streaming.receiver.格式的配置)将不会应用到由这种解决方案生成的输入DStreams上(但是会应用到其他输入DStream上)。相反,会是用spark.streaming.kafka.的配置。一个重要的东西是spark.streaming.kafka.maxRatePerPartition,它将限制该API的从每个Kafka partition读取数据的速度(每秒message的条数)。

  • Deploying: 这跟第一种解决方案中的相同。