Flume Install

本文主要介绍自己在生产中使用flume的实际配置,以便以后查询。如果能够为他人提供参考,荣幸之至。
在Flume中分为三个部分source、channel和sink。source主要用于接收数据,sink用于写出数据,channel作为source和sink的连接、保存和转发使用。其中非常好用的是,channel可以使用Kafka,从而使得Flume具有了超强的存储能力,如果在加上可靠的source和sink,完全可以保证数据零丢失。
本文使用的例子中采用的是内存channel,这种channel的缺点是存储长度有限,重启数据丢失,有点就是速度快,低延迟。至于source,使用的是avro。最后是sink,因为我得目的是将数据写入到HDFS中,以后Hadoop集群或Spark集群进行计算,因此使用的hdfs类型的sink。

配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
apiAgent.sources = source1 source2
apiAgent.channels = channel1 channel2
apiAgent.sinks = sink1 sink2 sink3 sink4
apiAgent.channels.channel1.type = memory
apiAgent.channels.channel1.capacity = 1000000
apiAgent.channels.channel1.transactionCapacity = 5000
apiAgent.channels.channel2.type = memory
apiAgent.channels.channel2.capacity = 1000000
apiAgent.channels.channel2.transactionCapacity = 5000
apiAgent.sources.source1.type = avro
apiAgent.sources.source1.port = 50000
apiAgent.sources.source1.channels = channel1
apiAgent.sources.source1.bind = 0.0.0.0
apiAgent.sources.source2.type = avro
apiAgent.sources.source2.port = 50001
apiAgent.sources.source2.channels = channel2
apiAgent.sources.source2.bind = 0.0.0.0
apiAgent.sinks.sink1.type = hdfs
apiAgent.sinks.sink1.channel = channel1
apiAgent.sinks.sink1.hdfs.path = hdfs://user/flume/events/api/%Y/%m/%d/%H/
apiAgent.sinks.sink1.hdfs.filePrefix = api_sink1_server1
apiAgent.sinks.sink1.hdfs.rollInterval = 3600
apiAgent.sinks.sink1.hdfs.rollSize = 0
apiAgent.sinks.sink1.hdfs.rollCount = 0
apiAgent.sinks.sink1.hdfs.batchSize = 1000
apiAgent.sinks.sink1.hdfs.fileType = DataStream
apiAgent.sinks.sink1.hdfs.writeFormat = Text
apiAgent.sinks.sink2.type = hdfs
apiAgent.sinks.sink2.channel = channel1
apiAgent.sinks.sink2.hdfs.path = hdfs:/user/flume/events/api/%Y/%m/%d/%H/
apiAgent.sinks.sink2.hdfs.filePrefix = api_sink2_server1
apiAgent.sinks.sink2.hdfs.rollInterval = 3600
apiAgent.sinks.sink2.hdfs.rollSize = 0
apiAgent.sinks.sink2.hdfs.rollCount = 0
apiAgent.sinks.sink2.hdfs.batchSize = 1000
apiAgent.sinks.sink2.hdfs.fileType = DataStream
apiAgent.sinks.sink2.hdfs.writeFormat = Text
apiAgent.sinks.sink3.type = hdfs
apiAgent.sinks.sink3.channel = channel2
apiAgent.sinks.sink3.hdfs.path = hdfs://user/flume/events/api/%Y/%m/%d/%H/
apiAgent.sinks.sink3.hdfs.filePrefix = api_sink3_server1
apiAgent.sinks.sink3.hdfs.rollInterval = 3600
apiAgent.sinks.sink3.hdfs.rollSize = 0
apiAgent.sinks.sink3.hdfs.rollCount = 0
apiAgent.sinks.sink3.hdfs.batchSize = 1000
apiAgent.sinks.sink3.hdfs.fileType = DataStream
apiAgent.sinks.sink3.hdfs.writeFormat = Text
apiAgent.sinks.sink4.type = hdfs
apiAgent.sinks.sink4.channel = channel2
apiAgent.sinks.sink4.hdfs.path = hdfs://user/flume/events/api/%Y/%m/%d/%H/
apiAgent.sinks.sink4.hdfs.filePrefix = api_sink4_server1
apiAgent.sinks.sink4.hdfs.rollInterval = 3600
apiAgent.sinks.sink4.hdfs.rollSize = 0
apiAgent.sinks.sink4.hdfs.rollCount = 0
apiAgent.sinks.sink4.hdfs.batchSize = 1000
apiAgent.sinks.sink4.hdfs.fileType = DataStream
apiAgent.sinks.sink4.hdfs.writeFormat = Text

补充说明
Flume的集群的节点个数为4,也就是说有4个进程,每台机器上一个。通过source和channel的关系可以看出,有两个source进行接收数据,每个source连接两个channel,每个channel一个sink。filePrefix配置为这样,是怕生成文件的时候有冲突,而且这样定义也方便查找问题,根据文件名可以知道对应的Flume实例。数据的存储是按小时区分的,方便计算数据的时候使用。
因为使用的hdfs类型的sink,因此flume的环境中还要包含Hadoop的环境配置,这样sink才能写输入到HDFS中。
配置的详细解释,请参考Flume配置相关的信息。

写入source使用的依赖如下:

1
2
3
4
5
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.5.2</version>
</dependency>

代码参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
props.put("hosts", "h1 h2 h3 h4");
props.put("hosts.h1", "flume-001.yz:11000");
props.put("hosts.h2", "flume-002.yz:11000");
props.put("hosts.h3", "flume-003.yz:11000");
props.put("hosts.h4", "flume-004.yz:11000");
props.put("host-selector", "round_robin"); //Flume集群中节点的选取规则
props.put("backoff", "true"); //Flume某个节点故障时,是否冷却(暂时不放到选取的集合中)
props.put("maxBackoff", "10000"); //FLume某个节点故障时,最大冷却时间,冷却时间是随着次数集合增长的,但是最长这个时间
flumeClient = RpcClientFactory.getInstance(props);
flumeClient.appendBatch((List<Event>)obj);
flumeClient.close()

备注:这种写入Flume的方式是非可靠的,因为flumeClient.appendBatch()方法没有返回任何参数,无法确保零数据丢失。但是如果Flume节点挂了,会抛出异常,因此可以通过捕获异常,对数据进行必要的处理以免无谓的丢失。