kafka-script

本文主要讨论kafka服务的相关启动和关闭脚本。

kafka-server-start.sh

Kafka服务的启动脚本,正确的用法为 kafka-server-start.sh [-daemon] server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 如果执行脚本时传入的参数小于1个,则退出执行并提示用户需要指定服务属性配置文件, 此处也说明了执行kafka-server-start.sh的正确用法
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties"
exit 1
fi
# $0 表示的是当前shell的文件名,dirname用来获取当前shell文件的所在目录
base_dir=$(dirname $0)
# 读取环境变量中的KAFKA_LOG4J_OPTS的信息,如果没有配置该环境变量,则将kafka目录下conf中的log4j.properties作为配置添加到环境变量中,配置给KAFKA_LOG4J_OPTS
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
# 读取环境变量中KAFKA_HEAP_OPTS的信息,如果没有配置该环境变量,则使用默认配置"-Xmx1G -Xms1G"来配置,并添加到环境变量"KAFKA_HEAP_OPTS"中
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
# 定义一个额外的参数 name,为kafka服务指定了进程名
EXTRA_ARGS="-name kafkaServer -loggc"
# 如果服务要作为后台进程运行,则需要添加-daemon参数,而且这个参数必须是第一个参数,如果第一个参数是-daemon,则为进程添加自定义的名称
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
# 启动kafka服务,由此处也可以看出来,可以使用kafka-run-class.sh来执行相关的类,其中$@表示的是命令行传入的所有参数,这里要启动的类名为kafka.Kafka
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka $@

kafka-server-stop.sh

Kafka服务的停止脚本,其实就是查找KafkaServer对应的进程号,并kill。

1
2
# 在进程中过滤包含"kafka.Kafka"且不包含"grep"的java进程,截取进程号kill掉
ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM

kafka-run-class.sh

kafka-run-class.sh是用来运行class的脚本。正确的用法为 kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# 验证kafka-run-class脚本的参数
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
exit 1
fi
# 获取Kafka的基目录,就是当前目录(bin)的上一层目录
base_dir=$(dirname $0)/..
# 创建Kafka的日志目录,首先从环境变量“LOG_DIR”中读取,如果没有配置LOG_DIR,则使用Kafka基目录下的logs目录作为日志目录
# create logs directory
if [ "x$LOG_DIR" = "x" ]; then
LOG_DIR="$base_dir/logs"
fi
# 如果日志目录目存在则创建日志目录
if [ ! -d "$LOG_DIR" ]; then
mkdir -p "$LOG_DIR"
fi
# 获取Scala的版本号,首先从环境变量 SCALA_VERSION 中读取,如果没有配置,则使用默认值 2.10.4
if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.10.4
fi
# 获取Scala库的版本号,首先从环境变量 SCALA_BINARY_VERSION 中读取,如果没有配置,则使用默认值 2.10
if [ -z "$SCALA_BINARY_VERSION" ]; then
SCALA_BINARY_VERSION=2.10
fi
# 这里开始加载各种依赖的jar包,并将这些jar包添加到CLASSPATH环境变量中,由此也可以看出运行完整的Kafka服务(支持各种consumer/producer)需要依赖的jar包
# run ./gradlew copyDependantLibs to get all dependant jars in a local dir
# 将Kafka依赖Scala的jar包添加到CLASSPATH中
for file in $base_dir/core/build/dependant-libs-${SCALA_VERSION}*/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将Kafka的示例jar添加到CLASSPATH中
for file in $base_dir/examples/build/libs//kafka-examples*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将kafka的hadoop consumer相关jar包添加到CLASSPATH中
for file in $base_dir/contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将Kafka的hadoop producer相关jar包添加到CLASSPATH中
for file in $base_dir/contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将Kafka客户端相关的jar包添加到CLASSPATH中
for file in $base_dir/clients/build/libs/kafka-clients*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将Kafka的libs下的jar包添加到CLASSPATH中
# classpath addition for release
for file in $base_dir/libs/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将Kafka依赖的Scala对应版本的库添加到CLASSPATH中
for file in $base_dir/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 以下是Java管理扩展的设置
# 如果没有在环境变量中设置KAFKA_JMX_OPTS,则将Kafka的JMX配置关闭
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
# 如果设置了KAFKA_JMX_OPTS环境变量,则利用这个值来设置变量KAFKA_JMX_OPTS的值,该值用于指定虚拟机的信息
# JMX port to use
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi
# Log4j的配置
# Log4j settings 如果环境变量中没有设置KAFKA_LOG4J_OPTS,则使用Kafka基目录下conf/tools-log4j.properties来设置KAFKA_LOG4J_OPTS变量
if [ -z "$KAFKA_LOG4J_OPTS" ]; then
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/config/tools-log4j.properties"
fi
# 根据环境变量LOG_DIR和KAFKA_LOG4J_OPTS来生成变量KAFKA_LOG4J_OPTS的新的值
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
# 判断环境变量KAFKA_OPTS是否有相关设置
# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS=""
fi
# 判断环境变量JAVA_HOME中是否有值,如果不存在则使用默认的java,如果有,则使用该目录下指定的java
# Which java to use
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
# Kafka的内存配置,如果环境变量KAFKA_HEAP_OPTS的值为空,则设置值为默认值-Xmx256M
# Memory options
if [ -z "$KAFKA_HEAP_OPTS" ]; then
KAFKA_HEAP_OPTS="-Xmx256M"
fi
# 如果没有设置环境变量KAFKA_JVM-PERFORMANCE_OPTS,则使用默认值进行配置
# JVM performance options
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
# 这里对脚本传入的参数进行解析,提取守护进程名/是否后台运行/GC日志这个三个信息
# 第一个case,如果循环到了-name参数,则读取-name的下一参数,下一个参数必定是后台进程的名字,而且控制台的输出日志文件也是该名字
# 第二个case,如果循环到了-loggc,则表示要记录GC日志,记录GC日志的另一个要求是配置KAFKA_GC_LOG_OPTS环境变量
# 第三个case,如果循环到了-daemon,则表示服务以后台进程的方式运行
while [ $# -gt 0 ]; do
COMMAND=$1
case $COMMAND in
-name)
DAEMON_NAME=$2
CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
shift 2
;;
-loggc)
if [ -z "$KAFKA_GC_LOG_OPTS"] ; then
GC_LOG_ENABLED="true"
fi
shift
;;
-daemon)
DAEMON_MODE="true"
shift
;;
*)
break
;;
esac
done
# 如果启用了GC日志,GC日志的名字为后台进程的名字[-name指定]-gc.log。
# GC options
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps "
fi
# 启动Java进程,将上面的所有信息整合在一起,使用指定的Java,还有各种参数,这里区分了运行模式,其实就是将进程作为后台进程运行还是前台进程运行而已
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

kafka-topics.sh

kafka-topics.sh是用来操作Kafka的Topic的脚本,其内部通过kafka-run-class.sh脚本来调用kafka.admin.TopicCommand来实现Topic的操作。

1
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@