本文是Kafka 0.10.0文档的翻译,主要用于自学。
1 Getting Started
1.1 Introduction
Kafka是一个分布式的、分区的、备份的提交日志服务。它提供了一个消息传输系统的功能,但是使用了一个独特的设计。
那意味着什么?
首先我们浏览一下基本的消息队列术语:
- Kafka以一种类型持续messages的提供称为topics。
- 我们称那些publish message到一个Kafka topic的进程为producers。
- 我们称那些subscribe到topics并处理被publish的message的进程为consumers。
- kafka作为一个集群而运行,集群由一个或多个server组成,每个server成为一个broker。
因此,整体来看,producers通过网络发送messages到Kafka集群,同样Kafka又为consumers服务,像这样:
clients和servers之间的通信是通过一个简单的、高性能的、跨语言的TCP协议完成的。我们为Kafka提供了一个Java client,但是clients在很多语言中都可用。
Topics and Logs
首先我们学习由Kafka提供的高级别的抽象 - topic。
一个topic是一种或一个提供的名称,用来publish message。对于每个topic,Kafka集群维持着一个分区日志,看起来像这样:
每个partition是一个顺序的、不可变的连续添加的消息队列。partitions中的每个message分配一个序列id号,称为offset,用来唯一标识partition中的每条message。
Kafka集群保存所有publish过来的message-不管它们是否被消费,保存时长可配置。例如,如果日志保存设置为两天,那么一个message在publish后两天内是可以被消费的,但是两天之后,它将被删除以释放空间。kafka的性能对于不同数据大小是恒定有效的,因此很多的数据不是个问题。
实际上每个consumer仅有被保存的元数据是consumer在日志中的位置,称为offset。这个offset由consumer控制:通常一个consumer按照它读取的message,线性的推进它的offset,但是这个位置由consumer控制并且consumer能够以任意顺序消费message。例如一个consumer可以重置到一个原来的offset来重新处理。
这个特征的组合意味着Kafka consumer是非常廉价的 - 它们能够自由的来去,而不会影响集群或其他consumer。例如,你可以使用我们的命令行工具来tail任何topic的内容,而不需要任何已经存在的consumers改变它所消费的内容。
日志中的partitions有几个用途。首先,它们允许日志扩展到单个server所能容纳的日志大小之外。一个partition必须位于它所属的server上,但是一个topic可能有很多的partitions,因此它能够持有任意数量的数据。其次,它们的行为类似一个并行单元 - 汇聚更多于一点。
Distribution
日志的partitions在Kafka集群中跨server分布,每个server为一个共享的partition处理数据和请求。每个partition为了容灾,跨server保存多个备份,备份的数量可以配置。
每个partition有一个server扮演”leader”的角色,并有零个或多个servers扮演”followers”的角色。leader为partition处理所有的读和写的请求,而follower只是被动的复制leader。如果leader失效了,follower中的一个将自动成为新的leader。每个server为它自己一些的partitions扮演一个leader角色,为其他的partition扮演一个follower的角色,因此每个server在集群中的负载是很均衡的。
Producers
Producers将数据publish到它们选择的topics中。Producer负责哪些message被分配到topic的哪些partition中。这可以通过简单的轮转来完成以平衡负载,或者可以一致性的partition定义函数来完成(例如基于message的某些key)。在分区上用的更多的是第二种。
Consumers
传统的消息传输有两种模式:queuing和publish-subscribe。在队列方式中,一个consumers池从一个server中读取,每条message只会到达某个consumer;在发布-订阅方式中,message广播给所有的consumers。Kafka提供了单个consumer抽象,它概括了上面两种方式 - consumer group。
consumers使用一个consumer群名称来标识它们自己,每个publish到一个topic的message被传递到每个订阅了topic的consumer组的一个consumer实例。consumer实例能够在单独的进程或单独的机器上。
如果所有的consumer实例拥有相同的consumer组,那么工作方式与一个传统的跨consumers负载均衡的队列类似。
如果所有的consumer实例都有不同的consumer组,那么工作方式与发布-订阅类似,所有的message会广播给所有的consumer。
更常见的,尽管我们发现那些topics有少数量的consumer组,然而每个都是一个逻辑订阅者。每个组由多个consumer实例组成,这样具有扩展性和容灾性。这也是publish-subscrib的定义,只不过subscriber是一个consumer群,而不是单个进程。
相对于传统消息传输系统,Kafka有更强的顺序保证。
一个传统队列在server上按顺序保存messages,如果多个consumer从队列中消费数据,那么server以message存储的顺序拿出message。然而,虽然server按照顺序拿出message,但是message以异步方式投递给consumers,因此它们可能在不同的consumer上以不同的顺序到达。这意味着在并行消费的情况中消息的顺序丢失了。消息传输系统通过一个”exclusive consumer”的概念来解决这个问题,它只允许一个进程从队列中消费,但是这意味着没有并行处理。
Kafka做的更好一些。通过一个并行概念-partition-在topics中,Kafka能够在一个consumer进程池上同时提供顺序保证和负载均衡。这是通过将topic中的partitions分配给consumer组中的consumers来完成的,因此每个partition有组中确切的一个consumer来消费。通过这样,我们确保consumer值读取那一个partition,并以顺序消费数据。因为有很多partitions在很多consumer实例上是均衡负载的。注意,一个consumer组中的consumer实例不能多余partitions的数量。
Kafka只是在一个partition中提供了一个整体的顺序,而不是在一个topic的不同partition之间。对于大多applications,每个分区的排序联合根据key划分数据的能力是充分的。如果你要求在message上有整体的顺序,这可以通过使用一个topic只有一个partition来完成,这也意味着每个consuemr组只有一个consumer进程。
Guarantees
在高层次上,Kafka给了如下的保证:
- 由一个producer发送到一个特定topic partition的Messages将会以它们被发送的顺序添加。那就是,如果一个message M1与发送message M2的producer是一个,并且M1先被发送,那么M1将有一个比M2小的offset,并且要比M2更早的添加到日志中。
- 一个consumer看到messages的顺序是messages存储的顺序。
- 对于使用了复制因子为N的topic,在不丢失任何提交到log的messages丢失,我们允许最多N-1个server故障。
这些保证的更多细节在文档的design章节中给出。
1.2 Use Cases
这是一个对于一些Apache Kafka流行用例的一个描述。对于这些action的一些概述,请参考blog post
Log Aggregation
很多人使用Kafka是将其作为日志聚合解决方案的替代品。日志聚合通常是收集日志文件并将日志文件日志到一个中心位置(可能是一个文件服务器或HDFS)用于处理。Kakfa从文件细节中抽象,并给出一个清晰的日志或事件数据的抽象来作为一个messages的流。这允许低延迟处理和对多个数据源及分布式数据消费的支持。相较于日志中心系统(如Scribe或Flume),Kafka提供了同样好的性能,并提供了更健壮的持久化保证用于复制,以及更低的端对端延迟。
Stream Processing
很多Kafka用户以处理由多个阶段组成的pipeline的方式处理数据,从Kafka topic中消费原生输入数据然后聚合、提取,或者转换到新的topic做进一步的消费或后续处理。例如,一个用于处理推荐新闻文章的pipeline可能会从RSS feeds中爬取文章内容并将其publish到一个”articles”topic中;进一步的处理可能是格式化或去重内容,并将干净的文章内容publish到一个新的topic;最后的处理阶段可能是尝试推送这些内容给用户。这样的处理流程,基于单独的topics创建实时数据流的的图表。从0.10.0.0开始,一个轻量级但是强大的流处理库(称为Kafka Streams)可以在Apache中使用,用来执行上面描述的数据处理。除了Kafka Streams,可供选择的其他开源的流处理工具包括Apache Storm和Apache Samza。
Event Sourcing
Event sourcing是一个apllication的设计方式,在其中,状态的变更作为一个时间顺序的记录序列被记录日志。 Kafka作为一个极好的后端,支持大量日志数据的存储,为application以这种风格惊醒构建。
Commit Log
Kafka能够作为一种外部日志提交为一个分布式系统提供服务。日志用来帮助在节点之间复制数据,并且行为类似重新同步机制,用于在节点故障时重新存储它们的数据。日志压缩特性在Kafka中用来帮助这种用法。在这种用法中,Kafka与Apache BookKeeper项目类似。
1.3 Quick Start
这个指南假设你刚刚开始,Kafka数据和ZooKeeper数据均为空。
Step 1 : Download the code
下载0.10.0.0发布版,并解压缩它。
Step 2 : Start the server
Kafka使用Zookeeper,因此如果你还没有一个ZooKeeper服务,你需要先启动一个ZooKeeper服务。你能够使用Kafka包内的方便脚本来得到一个quick-and-dirty的单节点ZooKeeper实例。
现在,启动Kafka服务:
Step 3 : Create a topic
创建一个名为”test”的topic,该topic只有一个partition,并且只有一个备份:
如果你运行列出topic的命令,你将看到上面的topic:
Step 4 : Send some messages
Kafka带有一个命令行客户端可以从一个文件或标准输入来获取输入,并将它作为messages发送给Kafka集群。默认每一行作为一个单独的message来发送。
运行producer并输入一些message到控制台来将它发送给server:
Step 5 : Start a consumer
Kafka还有一个命令行consumer,它将会消费message并将message转到标准输出。
如果上面的每个命令运行在不同的terminal中,那么现在你能够在producer terminal中键入messages,然后能够看到这些message会出现在consumer terminal中。
所有这些命令行工具有额外的参数;以无参数方式运行这些命令将会以更加细节的方式列出命令的使用文档。
Step 6 : Setting up a multi-broker cluster
到现在为止,我们已经针对单个broker进行运行,但是那并不好玩。对于Kafka,单个broker只一个size为1的cluster,因此除了启动稍微多一些borker实例,没有什么太多改变。只是为了感受,将我们的节点扩展到3个节点(仍然全部在本机上)。
首先我们为每个broker创建一个配置文件:
现在,编辑这些新文件,并如下设置属性:
这个属性
在集群中是每个节点唯一且永久的名称。我们只是重写了端口和日志目录,因为我们要在相同的机器上运行这些,为了避免它们都尝试注册相同的端口或重写了其他节点的数据。
我们已经有Zookeeper了,并且已经启动了一个节点,因此我们只需要启动两个新的的节点:
现在,创建一个复制因子为3的新的topic:
好了,现在我们已经有一个集群了,但是我们如何能够知道哪个borker在做什么呢?要想看到这些信息需要运行”describe topics”命令:
以下是一个输出说明。第一行,给出了所有partitions的一个汇总,每条额外的行给出一个partition的信息。因为这个topic只有一个partition,所以这里只有一行。
- “leader”是负责对给定的partition进行所有读和写的节点。partitions中的每个节点通过随机选取将可能成为leader。
- “replicas”
+
注意,在我的例子中,节点1是topic仅有partition的leader。
我们能够在原来topic上运行相同的命令,我们将看到:
因此没有什么可惊讶的 - 原来的topic在server0上没有备份,server0是我们集群中唯一的server。
publish一些message到我们的新topic中:
我们消费这些message:
现在测试我们的容灾。borker 1扮演了leader的角色,因此我们kill掉它:
leader关系已经切换到某一个slave,node1不再位于同步备份集合中:
但是messages对于消费依然是可用的,尽管原来写message的leader已经关闭:
Step 7 : Use Kafka Connect to import/export data
从控制台写入数据然后再将数据写出到控制台对于开始学习是很方便的,但是你很可能想要从其他数据源写入数据或从Kafka将数据到处到其他系统。对于很多系统,除了写自定义代码,你能够使用Kafka连接来导入和到处数据。Kafka Connect是包含在Kafka中的一个工具用来导入数据到Kafka或从Kafka中导出数据。它是一个可扩展的用来运行connectors的工具,它实现自定义逻辑来和一个外部系统进行交互。在快速开始中我们将看到如何使用一个简单的连接来运行Kafka,这个连接将从文件中导入数据到一个Kafka topic然后从这个topic中导出数据到一个文件中。首先,我们先创建用来测试的数据:
接下来,我们将以standalone模式启动了connectors,这意味着他们运行在单个、本地的进程中。我们提供了三个配置文件作为参数。第一个总是Kafka Connect进程的配置,包含常用的配置,诸如连接到的Kafka brokers、数据的序列化格式。剩下的配置文件每个指定了一个connector的创建。这些文件包含了一个唯一的connector名称,要实例化的connector类和任何被connector需要的配置。
这些简单的配置文件包含在Kafka中,之前使用默认本地集群配置启动的Kafka Connect,会创建两个connectors:第一个是一个数据源connector,用来从给一个输入文件中读入行并将读到行数据产出到一个Kafka topic;第二个是一个sink connector,用来从Kafka topic读取messages并将每个message作为一行产出到一个输出文件中。在启动期间,你将会看到一定数量的日志信息,包括一些connectors已经被启动的提示。一旦Kafka Connect进程被启动,source connector将开始从下面文件中读取行数据:
并且产出这些行数据到下面topic中:
,而且sink conector将会从下面topic中读取message:
并将读到的message写到下面文件中:
。通过检查输出文件的内容,我们能够检查由整个pipeline传递的数据:
注意,在Kafka topic中存储的数据
,因此我们还能运行一个控制台consumer来查看topic中的数据(或使用自定义consumer代码来处理它):
connectors继续处理数据,因此我们能够添加数据到文件,病看到它通过pipeline移动:
你应该能够看到这行数据出现在控制台consumer中和sink文件中。
Step 8 : Use Kafka Streams to process data
Kafka Streams是一个Kafka客户端库,用于实时的处理和分析存储在Kafka brokers中的数据。这个快速开始的例子将演示如何运行一个通过这个库编码的streaming application。这里是WordCountDemo例子代码的要点()。
它实现WordCount算法,计算每个word在输入文本中出现的频率。然而,不像其他之前你见过的WordCount例子,那些例子的操作有限的数据上,WordCount实例application表现的略微不同因为它被设计操作一个无穷大的、无限的数据流上。与绑定变量类似,它是一个状态化算法,它跟踪并更新word的计数。然而,因为它必须假设未绑定的输入数据,在持续处理更多数据的同时它将周期性的输出它的当前状态和结果,因为它不知道何时能够处理完“所有”数据。
现在,我们将准备输入数据到一个Kafka topic,这些数据将随后被一个Kafka Streams application所处理。
接下来,我们使用console producer将这些输入数据发送到一个名为streams-file-input的输入topic中(在实践中, 流数据像持续流入到Kafka中application启动并运行的地方):
|
|
现在我们可以运行WorkCount application来处理输入数据:
这里将不会有任何的标准输出,除了日志会作为结果持续的回写到Kafka中另一名为streams-wordcount-output的topic中。这个例子将运行一会儿并且不像通常的处理程序那样自动终止。
现在,我们通过application的输出topic中读取数据来检查WordCount application的输出:
将会有如下的输出数据被打印到控制台:
第一列是Kafka message key,第二列是message value,它们都使用java.lang.String格式。注意这里的输出实际上是一个持续的更新流,其中每条数据记录是单个word的更新后的数,记录的key诸如”Kafka”。对于使用相同key的多条记录,后面的记录更新前一条。
现在你能够写更多的输入message到streams-file-input这个topic中并且另外的messages添加到streams-wordcount-output topic中,显示被更新的word的数量(例如,像上面描述的使用console producer和console consumer)。
通过Ctrl-C来停止console consumer。
1.4 Ecosystem
有大量工具可以和Kafka进行集成。ecosystem page列出了这些工具中的一些,包括流处理系统、Hadoop集成、监控和部署工具。
Upgrading
升级暂时不考虑。
2 API
Kafka包含了四个核心APIs:
1、 Producer API:允许applications发送数据流到Kafka集群中的topics。
2、 Consumer API:允许applications从Kafka集群中读取数据流。
3、 Streams API:允许将输入topics中的数据流转换到输出topics。
4、 Connect API:允许实现connectors,从一些数据源系统或application拉取数据到Kafka,或从Kafka中将数据推送到一些sink系统或applicaion。
Kafka在一个独立于一种语言的协议上公开了它的所有功能,在很多编程语言中都有client可用。然而只有Java客户端作为主要Kafka项目的一部分,其他的作为一个独立的开源项目可用。非Java clients的可用列表在这里。
2.1 Procucer API
Producer API允许applications发送数据流到Kafka集群中的topics。
在javadocs中给出的producer如何使用的例子。
要使用producer,你可以使用下面的maven依赖:
2.2 Consumer API
Consumer API允许applications从Kafka集群的topics中读取数据流。
在javadocs中给出了consumer如何使用的例子。
要使用consumer,你需要使用下面的mava依赖:
2.3 Streams API
Streams API允许来自输入topics的数据流转换到输出topics。
如何使用这个库的例子在javadoc中进行了展示。
使用Streams API的其他文档可以在这里找到。
要是用Kafka Streams,你可以使用如下的maven依赖:
2.4 Connect API
Connect API允许来实现connectors来持续从源数据系统拉取数据到Kafka或从Kafka拉取数据到一些sink数据系统。
很多Connect的用户不需要直接使用这个API,他们能够使用预先构建的connectors,而不需要写任何代码。使用Connect的额外信息在这里是可用的。
想要实现自定义connectors的用户可以看这里。
2.5 Legacy APIs
更多遗留的producer和consumer api也包含在Kafka中。这些老的Scala API已经被废弃了,只是为了兼容性的目的而存在。它们的信息可以在这里找到。
3 Configuration
Kafka使用Key-Value对格式的属性文件进行配置。这些值能够通过文件或代码来提供。
3.1 Broker Configs
必不可少的配置如下:
- broker.id
- log.dirs
- zookeeper.connect
Topic级别配置和默认值将在下面更加详细的讨论。
Name | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
zookeeper.connect | Zookeeper主机名的字符串 | string | high | ||
advertised.host.name | 废弃的:只有当’advertised.listeners’或’listeners’都没有设置时使用。使用’advertised.listeners’代替。 | string | null | high |
暂停翻译