Baimoon's Note


  • 首页

  • 分类

  • 归档

  • 标签

spark-2.11-ExecutorRunnable

发表于 2018-10-19   |   分类于 spark 2.11

本文是对 org.apache.spark.deploy.yarn.ExecutorRunnable 源码进行学习的分析,spark版本为2.11。

概述

这个方法就是用来启动container的。准备环境、生成命令,发送给NMClient。

NMClient

NMClient是Node Manager的客户端。一下是一些常用的方法。

方法名 作用
public static NMClient createNMClient() 创建一个 NMClient实例
public void init(Configuration conf) 初始化 NMClient
public void start() 启动服务
public Map startContainer(Container container, ContainerLaunchContext containerLaunchContext) throws YarnException, IOException 启动一个分配的 contianer

spark 2.11 ApplicationMaster

发表于 2018-10-17   |   分类于 spark 2.11

本文是对 org.apache.spark.deploy.yarn.ApplicationMaster 源码进行学习的分析,spark的版本为2.11。

概述

ApplicationMaster 可以说是运行用户程序的入口类。该类的一些行为有解析用户参数、启动dirver、建立与driver的通信、启动reporter线程、启动用户类等。

主要方法分析

ApplicationMaster伴生类的main方法

该方法是ApplicationMaster的启动入口方法,方法定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def main(args: Array[String]): Unit = {
// 注册日志
SignalUtils.registerLogger(log)
// 实例化 ApplicationMasterArguments对应,用来解析传入的参数
val amArgs = new ApplicationMasterArguments(args)
// 是否设置了 --properties-file 参数,如果设置,则将properties文件中的配置加载到系统参数中
if (amArgs.propertiesFile != null) {
Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
sys.props(k) = v
}
}
// 以指定的用户运行 ApplicationMaster
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient)
// 运行 application master
System.exit(master.run())
}
}

从代码可以看出,该方法就是类的启动方法,注册日志、解析传入参数(如果参数传递了properties文件,则将properties中的配置加载到系统中),最后以特殊的用户身份启动ApplicationMaster(调用run方法)。

run

该方法用来启动applicationMaster,方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
final def run(): Int = {
try {
// 获取application的id
val appAttemptId = client.getAttemptId()
var attemptID: Option[String] = None
// 是否是集群模式,则设置集群模式需要的一些属性
if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
// Set the master and deploy mode property to match the requested mode.
System.setProperty("spark.master", "yarn")
System.setProperty("spark.submit.deployMode", "cluster")
// Set this internal configuration if it is running on cluster mode, this
// configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
attemptID = Option(appAttemptId.getAttemptId.toString)
}
// <1>
// 设置调用上下文,从spark.log.callerContext配置中读取
new CallerContext(
"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
logInfo("ApplicationAttemptId: " + appAttemptId)
// This shutdown hook should run *after* the SparkContext is shut down.
// 设置钩子函数,以便在 SparkContext 之后调用,进行操作
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
//
ShutdownHookManager.addShutdownHook(priority) { () =>
val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
if (!finished) {
// The default state of ApplicationMaster is failed if it is invoked by shut down hook.
// This behavior is different compared to 1.x version.
// If user application is exited ahead of time by calling System.exit(N), here mark
// this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
// System.exit(0) to terminate the application.
finish(finalStatus,
ApplicationMaster.EXIT_EARLY,
"Shutdown hook called before final status was reported.")
}
if (!unregistered) {
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
unregister(finalStatus, finalMsg)
cleanupStagingDir()
}
}
}
// <2>
// Call this to force generation of secret so it gets populated into the
// Hadoop UGI. This has to happen before the startUserApplication which does a
// doAs in order for the credentials to be passed on to the executor containers.
// 根据spark配置生成安全管理器
val securityMgr = new SecurityManager(sparkConf)
// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
// spark.yarn.credentials.file
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
// If a principal and keytab have been set, use that to create new credentials for executors
// periodically
credentialRenewer =
new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer()
credentialRenewer.scheduleLoginFromKeytab()
}
// <3>
// 根据不同的集群模式,调用不同的方法
if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}
// <4>
} catch {
case e: Exception =>
// catch everything else if not specifically handled
logError("Uncaught exception: ", e)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + e)
}
exitCode
}

<1> 这部分用来设置一些系统属性,以便后面使用。

<2> 设置CurrentContext以及钩子方法,以便在sparkContext销毁之后进行清理操作。钩子函数实际上是交给了 SparkShutdownHookManager 对象进行处理。

<3> 进行安全方面的一些设置,需要后续仔细看。

<4> 进行服务的启动,这里区分是集群模式(cluster)还是客户端模式(client)。其内部执行的总体步骤是一样的,只是每个步骤的做的方式不同。
接下来,先从集群模式来看,然后再看客户端模式。

runDriver

运行deiver,只有集群模式,才会执行这个方法,方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private def runDriver(securityMgr: SecurityManager): Unit = {
// 添加IP过滤器
addAmIpFilter()
// 启动用户Application,就是利用反射机制,运行用户指定的class中的main方法
userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
logInfo("Waiting for spark context initialization...")
// spark.yarn.am.waitTime
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
try {
// 等待 SparkContext的生成,最多等待 totalWaitTime 毫秒
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
// 启动 driver RPC
val driverRef = runAMEndpoint(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
// 注册Application master
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
if (!finished) {
throw new IllegalStateException("SparkContext is null but app is still running!")
}
}
// 等待 用户线程 的完成
userClassThread.join()
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
logError(
s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
"Please check earlier log output for errors. Failing the application.")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
}
}

这个方法比较重要,因此需要详细看看。addAmIpFilter方法,从代码来看是给Spark UI增加IP 过滤器的功能,方法中也会对不同的部署模式有不同的区分,对于集群模式,将过滤器信息(过滤器类和参数)设置到系统参数,而对于client模式,则通过RPC服务发送给了driver。由此可见对于集群模式,driver是运行在本地的,而客户端模式,driver是运行在别处的。运行在哪里呢?另外,addAmIpFilter实际上添加的是 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 这个Filter。
接下来,方法是在独立的线程中启动了用户类(通过–class参数传入的类),启动用户类,其大概的操作就是获取类加载器,利用反射,加载类并最终调用用户类的main方法。
接着,获取SparkContext,并通过runAMEndpoint方法得到 driverEndpint,driverEndpoint作为一个参数来向ResourceManager 注册 ApplicationMaster。
然后就是向 ResourceManager 注册 ApplcationMaster。
最后等待用户类的执行完成。

runExecutorLauncher

runExecutorLauncher方法与上面的rundirver的地位相同,只是针对client模式的启动方式。方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
val port = sparkConf.get(AM_PORT)
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
clientMode = true)
val driverRef = waitForSparkDriver()
addAmIpFilter()
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
securityMgr)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
}

此方法相对 rundriver来说就简单多了,首先创建了一个用于连接本机的RpcEnv,然后是等待SparkDriver的启动完成,添加IP Filter(通过amEndpoint发送给driver)。最后向ResourceManager 注册 ApplicationMaster。

registerAM

此方法用来处理向 ResourceManager 注册 ApplicationMaster。通过这个方法,就将ApplicationMaster与YarnRMClient和YarnAllocator联系起来了。方法定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private def registerAM(
_sparkConf: SparkConf,
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: Option[String],
securityMgr: SecurityManager) = {
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress =
_sparkConf.get(HISTORY_SERVER_ADDRESS)
.map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
.getOrElse("")
val driverUrl = RpcEndpointAddress(
_sparkConf.get("spark.driver.host"),
_sparkConf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
logInfo {
val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}
allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
securityMgr,
localResources)
allocator.allocateResources()
// 在 客户端 模式中 的 runExecutorLauncher 方法中join
reporterThread = launchReporterThread()
}

该方法主要就是获取各种参数,然后调用client(YarnRMClient)的register方法进行注册(说是向ResourceManager注册的application,但以我来看,是注册的driver),还有就是启动reporter线程。

launchReporterThread

用于生成并启动 reporter线程。方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
private def launchReporterThread(): Thread = {
// The number of failures in a row until Reporter thread give up
// 获取配置的 reporter 线程最大失败次数
val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
val t = new Thread {
override def run() {
var failureCount = 0
while (!finished) {
try {
// 如果 allocator 失败的 executor 个数已经超出了设置的最大值,则 终止 application 的运行(终止用户类的运行)
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else {
// 向YarnAllocator 申请资源
logDebug("Sending progress")
allocator.allocateResources()
}
failureCount = 0
} catch {
// 不同的异常不同的处理,中断异常,有可能是 finish 方法抛出来的
case i: InterruptedException => // do nothing
case e: ApplicationAttemptNotFoundException =>
failureCount += 1
logError("Exception from Reporter thread.", e)
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
e.getMessage)
// 如果 reporter 线程的尝试次数超过配置的最大值,则终止 用户类的运行
case e: Throwable =>
failureCount += 1
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
var sleepStart = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
// 计算 reporter 的休眠时长, 根据
sleepInterval =
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
// 进入这里,表示有丢失的container, 也有正在添加的container
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
currentAllocationInterval
} else {
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
sleepStart = System.currentTimeMillis()
allocatorLock.wait(sleepInterval)
}
val sleepDuration = System.currentTimeMillis() - sleepStart
// 如果符合这个条件,说明上面的 allocatorLock.wait所等待的时间不够,改用Thread.sleep来休眠
if (sleepDuration < sleepInterval) {
// log when sleep is interrupted
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval ms.")
// if sleep was less than the minimum interval, sleep for the rest of it
val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
if (toSleep > 0) {
logDebug(s"Going back to sleep for $toSleep ms")
// use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
// by the methods that signal allocatorLock because this is just finishing the min
// sleep interval, which should happen even if this is signalled again.
Thread.sleep(toSleep)
}
} else {
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval.")
}
} catch {
case e: InterruptedException =>
}
}
}
}
// setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.setName("Reporter")
t.start()
logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
s"initial allocation : $initialAllocationInterval) intervals")
t
}

该方法看起来很复杂,其实它主要要做的事情就是 向YarnAllocator申请资源(调用allocateResources方法)。其他代码就是判断是否还要申请资源,以及什么时候进行下一次申请。

startUserApplication

启动用户应用程序,其实就是启动用户通过 –class参数传递过来的类。方法定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
// 获取用户的类路经
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
val userClassLoader =
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
// When running pyspark, the app is run using PythonRunner. The second argument is the list
// of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
userArgs = Seq(args.primaryPyFile, "") ++ userArgs
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
// TODO(davies): add R dependencies here
}
// 用户类的main方法
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])
val userThread = new Thread {
override def run() {
try {
// 在新的线程中运行 用户类的 main方法
mainMethod.invoke(null, userArgs.toArray)
// 设置完成状态(用户类执行完成)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
// 记录日志
logDebug("Done running users class")
} catch {
// 根据异常的原因,设置用户类执行的完成状态(失败状态)
// 异常的原因分为
// 中断异常
// app异常
// 代码异常
case e: InvocationTargetException =>
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class
case SparkUserAppException(exitCode) =>
val msg = s"User application exited with status $exitCode"
logError(msg)
finish(FinalApplicationStatus.FAILED, exitCode, msg)
case cause: Throwable =>
logError("User class threw exception: " + cause, cause)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
"User class threw exception: " + cause)
}
sparkContextPromise.tryFailure(e.getCause())
} finally {
// Notify the thread waiting for the SparkContext, in case the application did not
// instantiate one. This will do nothing when the user code instantiates a SparkContext
// (with the correct master), or when the user code throws an exception (due to the
// tryFailure above).
sparkContextPromise.trySuccess(null)
}
}
}
// 设置线程的 类加载器 、 线程名字, 启动线程
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
}

首先,通过代码我们可以明确一个问题,那就是用户的程序,其实就是driver。对于这个方法实现的功能,简单来说,就是获取类路径、获取类加载器、实例话用户类、执行用户类的main方法。

spark 2.11 ApplicationMasterarguments

发表于 2018-10-12   |   分类于 spark 2.11

本文是对 org.apache.spark.deploy.yarn.ApplicationMasterArguments 源码学习的分析,spark的版本为2.11。

概述

org.apache.spark.deploy.yarn.ApplicationMasterArguments类主要用来对ApplicationMaster参数进行解析。

主要方法分析

parseArgs

该方法就是用来解析参数的。方法的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer = new ArrayBuffer[String]()
var args = inputArgs
// 从这个匹配可以看出,可以使用的参数列在下面,如果包含了其他参数,系统会异常退出
// --jar jar包
// --class 类
// --primary-py-file PYTHON语言编写的application
// --primary-r-file R语言编写的application
// --arg 其他参数,多个参数需要使用多个 --arg 1 --arg "name"
// --properties-file 配置文件
while (!args.isEmpty) {
// --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
// the properties with executor in their names are preferred.
// 开始解析 类参数 case ("--jar") :: value :: tail 就是提取参数和参数名 ,并把剩余的参数放到 tail中
args match {
case ("--jar") :: value :: tail =>
userJar = value
args = tail
case ("--class") :: value :: tail =>
userClass = value
args = tail
case ("--primary-py-file") :: value :: tail =>
primaryPyFile = value
args = tail
case ("--primary-r-file") :: value :: tail =>
primaryRFile = value
args = tail
case ("--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
case ("--properties-file") :: value :: tail =>
propertiesFile = value
args = tail
case _ =>
printUsageAndExit(1, args)
}
}
if (primaryPyFile != null && primaryRFile != null) {
// scalastyle:off println
System.err.println("Cannot have primary-py-file and primary-r-file at the same time")
// scalastyle:on println
System.exit(-1)
}
userArgs = userArgsBuffer.toList
}

这个方法对ApplicationMaster参数进行解析,通过方法中match…case判断代码,我们可以看出ApplicationMaster允许的参数只有 6 个,如果包含其他名称的参数则会异常退出,并且参数–primary-py-file 和 参数–primary-r-file 不允许同时出现。对于上面的match…case的分析,见章节结尾部分。

printUsageAndExit

该方法用来将ApplicationMaster的使用参数信息进行打印。方法定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
// scalastyle:off println
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
System.err.println("""
|Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
|Options:
| --jar JAR_PATH Path to your application's JAR file
| --class CLASS_NAME Name of your application's main class
| --primary-py-file A main Python file
| --primary-r-file A main R file
| --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
| --properties-file FILE Path to a custom Spark properties file.
""".stripMargin)
// scalastyle:on println
System.exit(exitCode)
}

问题分析

参数判断的match … case

首先看代码

1
2
3
4
5
6
7
8
args match {
case ("--jar") :: value :: tail =>
userJar = value
args = tail
case ("--class") :: value :: tail =>
userClass = value
args = tail

case中的信息其实就是匹配模式,这里,如“(“–jar”) :: value :: tail”,其实就是在args开头匹配 “–jar” 参数,也就是如果args中的第一个值为”–jar“,那么将args的第二个值赋值给value,最后将剩余的值放到 tail中,但是需要注意的是,这个模式是从args的第一个元素开始的,如果第二元素是“–jar”,是不符合条件的。

spark 2.11 YarnRMClient

发表于 2018-10-11   |   分类于 spark 2.11

本文是对 org.apache.spark.deploy.yarn.YarnRMClient 源码进行学习的分析,spark的版本为2.11。

概述

YarnRMClient主要用来处理application master向Yarn resourceManager的注册和注销。

主要方法分析

register

该方法很简单,就是向YARN ResourceManager注册application master,该方法会在 ApplicationMaster的registerAM方法中调用。具体方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator = {
// 调用AMRMClient自身的方法来生成AMRMClient,再使用 Yarn 配置进行初始化,启动AMRMClient
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress
val trackingUrl = uiAddress.getOrElse {
if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
}
logInfo("Registering the ApplicationMaster")
// 向 ResourceManager 注册 application master,从代码看出 application master就是本机,TODO 这个本机是啥呢???
synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)
registered = true
}
// 生成 YarnAllocator
// driverUrl和driverRef需要说一下,
// driverUrl,是driver运行的地址,会传递给Executor,应该是用于Execurot与driver进行交互
// driverRef 在YarnAllocator中使用,用于同步executor的id,以及 发送删除executor的信息
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
}

此方法逻辑很简单,一眼就看明白。生成AMRMClient(用于访问ResourceManager),向ResourceManager注册applicationMaster,生成YarnAllocator。但是需要注意生成YarnAllocator的参数。

unregister

作用与register方法相反,从YARN ResourceManager中注销 application master。具体方法实现

1
2
3
4
5
def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit = synchronized {
if (registered) {
amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
}
}

getMaxRegAttempts

此方法就是用来定义注册application master的最大尝试次数。具体方法定义

1
2
3
4
5
6
7
8
9
10
/** 获取注册AM的最大尝试次数 分别从spark配置和yarn配置中读取 如果spark配置中设置了,则使用spark和yarn配置中最小那个值 */
def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = {
val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt)
val yarnMaxAttempts = yarnConf.getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
sparkMaxAttempts match {
case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts
case None => yarnMaxAttempts
}
}

此方法也很简单,分别从spark配置和yarn配置中读取 如果spark配置中设置了,则使用spark和yarn配置中最小那个值。没有在spark中配置,则使用yarn配置中的。

问题

哪里生成YarnRMClient对象

答案就是在ApplicationMaster的main方法中,代码:

1
2
3
4
5
6
7
8
9
def main(args: Array[String]): Unit = {
...
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
}
}

哪里调用 YarnRMClient的register方法

在register方法中看到了YarnAllocator的生成,那么在哪里调用register方法呢?答案就是 org.apache.spark.deploy.yarn.ApplicationMaster中。而且ApplicationMaster含有main方法,是程序的入口。代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private def registerAM(
_sparkConf: SparkConf,
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: Option[String],
securityMgr: SecurityManager) = {
...
allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
securityMgr,
localResources)
allocator.allocateResources()
reporterThread = launchReporterThread()
}

spark 2.11 YarnAllocator

发表于 2018-10-10   |   分类于 spark 2.11

本文是对 org.apache.spark.deploy.yarn.YarnAllocator 类源码进行学习的分析,spark的版本为2.11。

总体概述

YarnAllocator可以理解成一个Container的筛选器。当调用了YarnAllocator.allocateResources()方法后,程序就会进行各种处理,最终调用ExecutorRunnable类来启动Executor。在YarnAllocator类中,最主要的方法有:allocateResources()、updateResourceRequests()、handleAllocatedContainers()、runAllocatedContainers()和processCompletedContainers()。而整个这些方法的调用,是通过allocateResources()来调用的。基本的流程如下图:

主要方法的分析

allocateResources

资源分配的入口,首先看方法的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def allocateResources(): Unit = synchronized {
updateResourceRequests()
// 处理指示器
val progressIndicator = 0.1f
// Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
// requests.
// 调用 AMRMClient 分配资源
val allocateResponse = amClient.allocate(progressIndicator)
// 得到已经分配的 container
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
// 输出日志信息,包括 分配的container数量,正在运行的以及启动的executor数量,以及可用的资源信息
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
numExecutorsRunning.get,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))
// 处理已经分配的container
handleAllocatedContainers(allocatedContainers.asScala)
}
// 获取已经执行完成的 container
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
//处理已经完成的container
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning.get))
}
}

这个方法首先就是要更新资源的申请(调用updateResourceRequests()方法,我们稍后再看),然后就是调用AMRMClient(amClient)来分配资源,分配资源的返回值(allocateResponse)会包含三部分信息:已经分配的Container(allocatedContainers)、可用的资源和完成的Container(completedContainers)。对于已经分配的和完成的Container,会有对应的方法去处理;对于可用的资源,只是输出到日志。

updateResourceRequests

更新资源的请求信息,首先看方法的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def updateResourceRequests(): Unit = {
// 得到正在添加的container 并 得到正在添加 container的数量
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
// 计划要启动的executor数量 - 正在请求启动的executor - 已经启动的executor - 已经运行任务的executor = 缺少多少executor
// 所以这里是在计算 比预计要启动的executor,还缺少多少个
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - numExecutorsRunning.get
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
s"executorsStarting: ${numExecutorsStarting.get}")
// <1>
// missing 可以为正数 也可能为负数,负数则说明 动态分配分配多了,但是没有超过最大个数,这个数是通过计划启动个数算出来的
if (missing > 0) {
logInfo(s"Will request $missing executor container(s), each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
// 将要添加的container请求拆分到三个组:位置匹配列表、位置不匹配列表 和 无位置列表。
// 对于位置匹配的 container 请求,将他们放到可用的地方,等待分配
// 对于位置不匹配的和无位置的container请求,取消这些container的请求,因为 位置优先权已经变了,
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
// <2>
// cancel "stale" requests for locations that are no longer needed
// 对于位置不匹配的container,进行移除操作,并记录日志, 移除了N个container
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
if (cancelledContainers > 0) {
logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
}
// <3>
// 计算还可以分配的container的数量,就是 缺少的 + 取消的
// 因为cancelledContainers的个数实际上就是从pendingAllocate 中取消的
val availableContainers = missing + cancelledContainers
// 计算潜在的container就是 可以分配的container + 上面那些 不限制位置的的contianer的数量
val potentialContainers = availableContainers + anyHostRequests.size
// TODO 这是在弄啥 ??? 应该是根据 潜在container的数量,生成对应个的contaner的位置引用
val containerLocalityPreferences = if (labelExpression.isEmpty) {
containerPlacementStrategy.localityOfRequestedContainers(
potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
allocatedHostToContainersMap, localRequests)
} else {
Array.empty[ContainerLocalityPreferences]
}
// 根据上面的containerLocalityPreferences 创建container添加请求(注意是请求)
// newLocalityRequest 用来记录要添加的container的请求
val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
containerLocalityPreferences.foreach {
// createContainerRequest 用来生成container的创建请求
case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
newLocalityRequests += createContainerRequest(resource, nodes, racks)
case _ =>
}
// 需要再次判断 总的container的数量,如果availableContainers > newLocalityRequests 表示,还不够
// 为啥 availableContainers 会大于 newLocalityRequests ? 因为 labelExpression.isEmpty 为空时,会生成一个空的Array
if (availableContainers >= newLocalityRequests.size) {
// more containers are available than needed for locality, fill in requests for any host
for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
// createContainerRequest 用来生成container的创建请求
newLocalityRequests += createContainerRequest(resource, null, null)
}
} else {
// 这里的 newLocalityRequests 实际上对应的个是 potentialContainers = availableContainers + anyHostRequests.size
// 因此,如果 newLocalityRequests > availableContainers 则表示生成多了,且 anyHostRequests 的多了
val numToCancel = newLocalityRequests.size - availableContainers
// 因此释放到一些多余的 anyHostRequests
// cancel some requests without locality preferences to schedule more local containers
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
if (numToCancel > 0) {
logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
}
}
// <4>
// AMRMClient 请求添加container
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
}
// <5>
if (log.isInfoEnabled()) {
val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
if (anyHost.nonEmpty) {
logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
}
localized.foreach { request =>
logInfo(s"Submitted container request for host ${hostStr(request)}.")
}
}
// <6>
} else if (numPendingAllocate > 0 && missing < 0) {
// 如果不缺少executor,并且还有正在添加的executor
// 计算要取消的contaner的数量,为什么是最小值,个人这样理解:-missing,其实是多出来的,但是在计算missing的时候,
// 已经减去 numPendingAllocate 了,也就是说 numPendingAllocate 认为是已经使用的数量
// 因此,如果取最大值,那么当 numPendingAllocate > -missing 时,删除的container就太多了
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
s"total $targetNumExecutors executors.")
// 获取匹配的请求,并且有匹配的数据,则移除这些container(而且我猜测,这里的寻找匹配的请求,是在pending的队列中找的)
val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.iterator().next().asScala
.take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
}
// <7>
}
}

<1> 部分主要就是计算系统是否已经达到了目标个数的executor(container是executor的容器,目前一个container只包含了一个executor),并计算缺少的个数。计算的公式就是val missing = targetNumExecutors - numPendingAllocate - numExecutorsStarting.get - numExecutorsRunning.get。targetNumExecutors是配置中 spark.dynamicAllocation.minExecutors 、spark.dynamicAllocation.initialExecutors 以及 spark.executor.instances 三个配置中的最大值(开启了动态分配的话,如果没开启,则使用 spark.executor.instances 的值),并且要求 spark.dynamicAllocation.maxExecutors >= spark.dynamicAllocation.initialExecutors >= spark.dynamicAllocation.minExecutors(如果没有开启动态分配,则不存在这种要求)。这里涉及了4个配置项。numPendingAllocate的值调用AMRMClient的getMatchingRequests方法,获取location为*的所有请求。

<2> 这一部分就是对添加中的请求(pendingAllocate)进行分类,分为位置自由的、位置匹配的和位置不匹配的。分类的依据是ContainerRequest.getNodes。如果nodes为空,则认为是位置自由的,如果nodes不空,则判断nodes是否hostToLocalTaskCounts的keyset有交集,如果有则认为是匹配的,否则不匹配。这里的匹配和不匹配,大概就是寻找本地的containerRequest(????????????????)。

<3> 这一部分是对上面找出来的位置不匹配的请求,进行取消,并记录日志。筛选的规则是优先使用位置符合的,坚决不使用位置不符合的,数量不够的时候,使用位置自由的。

<4> 这一部分是计算出需要创建的container个数。进行ContainerRequest的创建,首先创建位置符合的,然后创建位置自由的,并且将多余的位置自由的请求取消掉。

<5> 在这里调用AMRMClient的addContainerRequest方法来增加ContainerRequest。

<6> 记录日志

<7> 将超出目标个数的container(位置自由的)取消。
所以从总体来看,这个方法其实就是把container的个数控制在目标个数范围内,如果缺少了,则增加,如果多了,则取消一些请求。

handleAllocatedContainers

此方法用来处理申请资源的container。方法的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
// 用来保存要使用的Containers
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
// Match incoming requests by host 根据host进行匹配,匹配不成功的下面继续匹配
val remainingAfterHostMatches = new ArrayBuffer[Container]
for (allocatedContainer <- allocatedContainers) {
matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
containersToUse, remainingAfterHostMatches)
}
// Match remaining by rack 对上面按照host匹配不成功的进行机架匹配
val remainingAfterRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
// 通过上面的两次匹配可以看到,最优先使用的是 host -> rack -> *
// Assign remaining that are neither node-local nor rack-local
val remainingAfterOffRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterRackMatches) {
matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
remainingAfterOffRackMatches)
}
// 三次匹配都没有成功的,就释放掉了
if (!remainingAfterOffRackMatches.isEmpty) {
logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
s"allocated to us")
for (container <- remainingAfterOffRackMatches) {
internalReleaseContainer(container)
}
}
// 启动所有的container,这些container上经过上面过滤后的可以使用container
runAllocatedContainers(containersToUse)
logInfo("Received %d containers from YARN, launching executors on %d of them."
.format(allocatedContainers.size, containersToUse.size))
}

从代码来看,这个方法其实就是对allocatedContainers再次进行过滤,根据主机、机架 和 通配主机(*)。最后将不匹配的释放掉(不同于取消请求)。并在方法的最后启动所有分配的container启动。

runAllocatedContainers

运行 container,方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
for (container <- containersToUse) {
// executorIdConter 应该是同步的,用来生成新的executor的id
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId")
//<1>
// 主要是为了更新各种计数
def updateInternalState(): Unit = synchronized {
// 正在运行的executor加一
numExecutorsRunning.incrementAndGet()
// 启动中的executor 减一,因为已经运行了,就不能处于启动中了
numExecutorsStarting.decrementAndGet()
//保存 executor -> container的关系 以及 container -> executorId的关系
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
// 记录 某个主机上都有哪些 containers
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
containerSet += containerId
// 记录contain运行在那个主机上
allocatedContainerToHostMap.put(containerId, executorHostname)
}
// <2>
// 如果正在运行的executor的数量小于 想要启动的executor个数
if (numExecutorsRunning.get < targetNumExecutors) {
// 将启动的executor的计数加一
// 这里为啥要加一呢???
// 因为 接下来要启动 container了,没有启动起来之前则认为处于启动中,因此先加一,一旦启动完成,则调用上面的updateInternalState
// 方法,在这个方法中会对运行中的executor加一,启动中的executor减一。
numExecutorsStarting.incrementAndGet()
// spark.yarn.launchContainers 配置的值
if (launchContainers) {
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
// 调用 ExecutorRunnalbe方法来启动container
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
// 跟新各种计数
updateInternalState()
} catch {
case e: Throwable =>
numExecutorsStarting.decrementAndGet()
if (NonFatal(e)) {
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately
// to avoid unnecessary resource occupation.
// 启动失败,则释放掉这个container
amClient.releaseAssignedContainer(containerId)
} else {
throw e
}
}
}
})
} else {
// For test only
updateInternalState()
}
} else {
logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
"reached target Executors count: %d.").format(
numExecutorsRunning.get, targetNumExecutors))
}
}
// <3>
}

<1> 获取container和executor的信息,方便下面使用。

<2> 定义了一个方法,这个方法就是用来更新各种数据关系,有executor运行的数量、executor启动的数量、executorId到container的对应关系、containerId到executorId的对应关系、host上运行的contianer的对应关系、container在哪个host运行的关系。

<3> 异步生成ExecutorRunnable,并启动。

processCompletedContainers

此方法也在allocateResources()方法中调用,用来处理完成的container,完成的container可能是被kill掉,也可能是正常完成的。方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
// 循环完成的container
for (completedContainer <- completedContainers) {
// 获取containerId, 判断container是否在releasedContainers 中,获取 container所在的host
val containerId = completedContainer.getContainerId
val alreadyReleased = releasedContainers.remove(containerId)
val hostOpt = allocatedContainerToHostMap.get(containerId)
val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
// <1>
// 如果alreadyReleased为false,则表示releasedContainers 中没有这个container,或者是有但是删除失败
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId,
onHostStr,
completedContainer.getState,
completedContainer.getExitStatus))
// 获取container的退出状态
val exitStatus = completedContainer.getExitStatus
// 根据退出状态来判断是否是由于Application的原因
// 以及退出的原因
// ContainerExitStatus.SUCCESS 表示是因为YARN事件,不是因为运行的job发生error而导致的
// ContainerExitStatus.PREEMPTED 表示主机上的端口被占用了
// VMEM_EXCEEDED_EXIT_CODE 表示 虚拟内存的问题
// PMEM_EXCEEDED_EXIT_CODE 表示 物理内存的问题
// 否则
// 除了第一种和第二种,其他的认为都属于 Application的原因
val (exitCausedByApp, containerExitReason) = exitStatus match {
case ContainerExitStatus.SUCCESS =>
(false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
"pre-emption) and not because of an error in the running job.")
case ContainerExitStatus.PREEMPTED =>
// Preemption is not the fault of the running tasks, since YARN preempts containers
// merely to do resource sharing, and tasks that fail due to preempted executors could
// just as easily finish on any other executor. See SPARK-8167.
(false, s"Container ${containerId}${onHostStr} was preempted.")
// Should probably still count memory exceeded exit codes towards task failures
case VMEM_EXCEEDED_EXIT_CODE =>
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
VMEM_EXCEEDED_PATTERN))
case PMEM_EXCEEDED_EXIT_CODE =>
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
case _ =>
// Enqueue the timestamp of failed executor
failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
}
// 如果是由于Application的原因退出的,则以警告记录日志,否则以info级别记录日志
if (exitCausedByApp) {
logWarning(containerExitReason)
} else {
logInfo(containerExitReason)
}
// 生成一个ExecutorExited 对象并返回给变量
ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)
} else {
// 如果 alreadyReleased 为true,则表示 container已经被kill掉了,在 internalReleaseContainer 方法中会操作
ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,
s"Container $containerId exited from explicit termination request.")
}
// <2>
for {
host <- hostOpt
// 根据host获取上面所运行的container
containerSet <- allocatedHostToContainersMap.get(host)
} {
// 将container从host所包含的container集合中删除,这样host上的container集合就含有这个container了
containerSet.remove(containerId)
// 删除完成后,如果 container 集合列表空了,则说明 host上只运行了这一个contianer,则删除host与container的对应关系,否则就更新一下
if (containerSet.isEmpty) {
allocatedHostToContainersMap.remove(host)
} else {
allocatedHostToContainersMap.update(host, containerSet)
}
// 将container 到 host的映射关系也删除
allocatedContainerToHostMap.remove(containerId)
}
// 移除 container到executor的对应关系
containerIdToExecutorId.remove(containerId).foreach { eid =>
// 将executor到container的对应关系也删除
executorIdToContainer.remove(eid)
// 从 pendingLossReasonRequests 移除 executor
pendingLossReasonRequests.remove(eid) match {
case Some(pendingRequests) =>
// Notify application of executor loss reasons so it can decide whether it should abort
pendingRequests.foreach(_.reply(exitReason))
case None =>
releasedExecutorLossReasons.put(eid, exitReason)
}
// <3>
// 如果没有被kill掉
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
// Notify backend about the failure of the executor
// container非异常释放的计数加一
numUnexpectedContainerRelease += 1
// 发送删除 execurot的请求
driverRef.send(RemoveExecutor(eid, exitReason))
}
// <4>
}
}
}

<1> 获取containerId, 判断container是否在releasedContainers 中,获取 container所在的host,以便后面使用。

<2> 分析contianer退出的原因,退出的类型分为 App引发的和非App引发的。具体的判断,可以看代码和注释。

<3> 主要为了清理container各种关系的保存信息。

<4> 使用Netty RPC发送移除Executor的信息。

总结

从上面的代码的调用过程以及实现我们可以看出,YarnAllocator实际上类似一个过滤器,它会从Resource Manager那里申请资源(通过AMRMClient获取),对获取到资源按照host -> stack -> any的顺序筛选container,并将合适的container启动后,反馈给调用者。
这个类功能很简单,但是在整个集群中又比较不太好理解,首先需要确定的是,YarnAllocator自己不管理资源(不对资源进行操作),只是筛选,虽然也会有释放、取消操作,但是这些操作都是调用Resource Manager的api来完成的。
另外,YarnAllocator是运行在driver上的,由AM来调用(?????需要再次确认),因此,如果每个application都会有自己的YarnAllocator,它只是为自己的job的运行筛选container,而不是全局为所有的application统一筛选。

AMRMClient的一些方法

方法名 作用
AllocateResponse allocate(float progressIndicator) 请求额外的container并接收新的container的分配。
List<? extends Collection> getMatchingRequests(Priority priority, String resourceName, Resource capability) 获取与给定参数匹配的未完成的请求。
void removeContainerRequest(T req) 移除之前的container请求。
void addContainerRequest(T req) 在调用allocate之前为container申请资源。
void releaseAssignedContainer(ContainerId containerId) 释放由Resource Manager分配的container。
RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) 注册application Master

一些问题

YarnAllocator是什么时候生成的

是在YarnRMClient的register方法中,向AMRMClient注册ApplicationMaster(调用AMRMClient.registerApplicationMaster方法)完成之后生成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress
val trackingUrl = uiAddress.getOrElse {
if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
}
logInfo("Registering the ApplicationMaster")
synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)
registered = true
}
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
}

Spark Resource Study

发表于 2018-09-17

Core

Partition 分区

在Partition特征中,只定义了index,用来表示分区在父级RDD中索引。

Dependency 依赖

依赖的定义在 org.apache.spark.Dependency类中,从代码中可以看出,Dependency有2个子类,分别代表了2中类型的依赖,分别为:NarrowDependency和ShuffleDependency。其中NarrowDependency有两个子类:OneToOneDependency和RangeDependency。

RDD

RDD是Resilient Distributed Dataset的简称,是Spark中的基本抽象。要实例化一个RDD,需要两个参数:SparkContext和Dependency列表。需要SparkContext是因为SparkContext提供了RDD的一些操作(如生成RDD的id,清理RDD的缓存、缓存RDD等),而Dependency则是因为它表示了RDD的继承关系。

1
2
3
4
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {

此处可以看到RDD的基本构造方法。

1
2
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))

使用已有的RDD构造新的RDD。

此外,RDD定义了一些抽象,需要子类进行实现:

1
2
3
4
5
def compute(split: Partition, context: TaskContext): Iterator[T] -- 计算给定的分区,返回一个迭代器
protected def getPartitions: Array[Partition] -- 返回RDD的所有分区
protected def getDependencies: Seq[Dependency[_]] = deps -- 返回RDD的到父类RDD的所有依赖

SparkContext

wordCount分析

了解了一些代码之后,决定从wordCount的案例进行分析,以便了解Spark进行计算时的具体操作。

1
2
val sc = new SparkContext("master", "testApplication");
sc.hadoopFile("path", 5).map(_ => 1).count()

因为在Spark中,transform是延迟执行的,也就是说,action之前的transform只有在遇到后面的action之后,才开始执行。因此上面的代码就要从count()开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
//Utils.getIteratorSize方法,在之后的代码块中展示,其实现就是根据参数的Iterator,计算一下这个迭代器中的数据个数(猜测迭代器最终是RDD分区的迭代器)
//这里的runJob的定义是
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
//然后又指向
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
//然后继续指向
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
//继续指向
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = {
...
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

最终执行Job落到了 dagScheduler 对象身上

1
2
3
4
5
def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
...
}

dagScheduler 的runJob方法中调用submitJob来提交任务

1
2
3
4
5
6
def submitJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties):JobWaiter[U] = {
...
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
waiter
}

最终,通过eventProcessLoop的post将任务提交到了任务执行队列。 这里需要注意的一个问题,加入任务队列的是一个 JobSubmitted对象。为什么要如此处理呢?需要从eventProcessLoop对象入手。
eventProcessLoop是DAGSchedulerEventProcessLoop的实例

1
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

查看 DAGSchedulerEventProcessLoop 的定义

1
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging

DAGSchedulerEventProcessLoop继承于 EventLoop,EventLoop的内部有一个EventThread的线程,该线程从事件队列中循环获取数据

1
2
3
4
5
6
7
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
} catch {
}
}

DAGSchedulerEventProcessLoop的doOnReceive方法的定义如下

1
2
3
4
5
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
...
}

当从事件队列中获取到数据后,如果JobSubmitted对象,则调用dagScheduler的handleJobSubmitted方法。由此也知道了为什么eventProcessLoop.post()推的数据是 JobSubmitted 对象了。

再看handleJobSubmitted方法的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}

由此看到了创建步骤(createResultStage(finalRDD, func, partitions, jobId, callSite))和提交步骤(submitStage(finalStage))的代码。

???上面的代码分析过程中,我们知道整个transform的触发点是从action(count())开始的,而count是最后一个RDD(map生成的那个RDD)的方法。map对应的RDD是MapPartitionsRDD。在MapPartitionsRDD的compute方法中,而compute方法中使用的迭代器是从最开始的那个RDD开始的( firstParent[T].iterator(split, context) )。

创建步骤 createResultStage(finalRDD, func, partitions, jobId, callSite)

Python学习之optparse

发表于 2018-09-05   |   分类于 python

optparse

在使用python进行命令开发的过程中,经常需要使用的就是命令行参数了,本章节介绍一个功能强大,易于使用的内建命令行参数处理模块optparse。

简单的使用

首先需要创建一个OptionParser对象,该类属于optparse模块,因此使用前需要导入。

1
2
3
4
5
6
7
from optparse import OptionParser
parser = OptionParser(usage="usage: %prog action [options]",)
或
import optparse
parser = optparse.OptionParser(usage="usage: %prog action [options]",)

可以不写参数,参数用来指定帮助的显示信息,如果没有指定,则默认显示“usage: %prog [options]”。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def __init__(self,
usage=None,
option_list=None,
option_class=Option,
version=None,
conflict_handler="error",
description=None,
formatter=None,
add_help_option=True,
prog=None,
epilog=None):
作者:fuyoufang
链接:https://www.jianshu.com/p/bec089061742
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

add_option函数

创建OptionParser对象之后,就可以使用add_option来定义命令行参数了

1
parsser.add_option(...)

参数

add_option的前两参数分别为短参数名和长参数名。其中长参数名可以省略。

1
parser.add_option("-f", dest="fileName")

dest

定义程序内参数值的名字,之后通过这个名字来取参数的值。如果为空,则使用不加“-”的短参数名。

action

action是add_option的一个参数,用来指定解析到参数后的操作。默认值为“store”,表示将参数值保存到options对象中。

1
2
3
parser.add_option("-f", "--file", action="store", dest="filename")
(options, args) = parser.parse_args(["-f", "myfile.log"])
print options.filename

其中action值的store还可以有其他两种类型:store_ture和store_false,用来处理不带参数值的参数。

1
2
parser.add_option("-g", action="store_true", dest="isGoodUser")
parser.add_option("-s", action="store_false", dest="isGoodUser")

action可以使用的值出了store_true和store_false之外,还可以使用store、store_const、append、count、callback等。

type

type是add_option方法的一个参数,用来指定参数值的类型,默认为String。还可以指定为“int”、“float”等

1
parser.add_option("-n", type="int", dest="num")

default

通过add_option方法的default参数可以对命令行参数设置默认值。

1
parser.add_option("-f", "--file", default="test.log")

help

用于指定参数在帮助程序中显示的信息,详细请见下方的“生成帮助”章节。

metavar

metavar配合help参数,用于帮助提醒用户该命令行所期待的参数,详细请见下方的“生成帮助”章节。

choices

当type为choices时,需要设置此值。

const

指定一个常数,配合action为store_const和append_const时一起使用。

parse_args函数

一旦定义好了所有的命令行参数,就可以调用parse_args()来解析命令行。

1
(options, args) = parser.parse_args()

参数

你可以传递一个命令行参数列表给parse_args方法,否则默认使用sys.argv[:1]。
parse_args方法有两个返回值:

options optpars.Values对象,保存了所有命令行的值。只要知道命令行参数名,就可以得到。
args 一个由 positional arguments组成的列表。

set_defaults函数

除了在add_option方法中使用default参数设置默认值,还可以使用set_default函数,来统一设置默认值。该方法应该在所有add_option函数之前调用。

1
parser.set_default(filename="test.log", isGoodUser=False)

set_defaults函数,可以用来设置多个默认值。

has_option方法

用来检查是否有相应的选项

1
print parser.has_option("fileName")

remove_option()

用来删除相应的选项。

1
print parser.remove_option("fileName")

生成帮助

optparse的另一个方便的功能便是自动生成帮助,而你需要做的事情就是在调用add_option方法时指定help参数。

1
2
parser = optparse.OptionParser(usage="usage: %prog action [options]",)
parser.add_option("-v", action="store_true", metavar="参数的期望值", help="这是-v的参数的意义")

当optparse解析到-h或者–help时,就会调用parser.print_help()方法来打印帮助信息。

分组

异常处理

在出现用户输入无效的、不完整的命令行参数而发生异常时,optparse可以自动检测并处理,比如参数值类型错误等。
用户也可以使用parser.error函数来手动抛出异常。

1
2
3
parser.parse_args()
if parser.isGooodUser:
parse.error("this is an exception")

Ambari Resource 01

发表于 2018-09-04

在使用Ambari的过程中遇到了好多问题,比如删除一个cluster(使用ambari-server reset命令)后,重启Ambari Server之后一直报错,提示找不到集群。尝试各种方法之后,无法找到满意的解决之道的情况下,只好硬着头皮读读源码,了解一下Ambari的启动机制。在此记下源码阅读的笔记和心得。

首先分析一下Ambari Server的启动脚本(ambari-server.py),以便了解Ambari Server是如何启动的。

首先看入口函数

1
2
3
4
5
6
if __name__ == "__main__":
try:
mainBody()
except (KeyboardInterrupt, EOFError):
print("\nAborting ... Keyboard Interrupt.")
sys.exit(1)

没什么可说的,就是调用文件中的 mainBody() 方法。

mainBody方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def mainBody():
#初始化命令行参数解析器
parser = optparse.OptionParser(usage="usage: %prog action [options]",)
action = sys.argv[1]
#init_action_parser方法,在脚本中会有两个方法,一个是针对windows的一个是针对linux的,所以看代码的时候需要注意(方法上使用@OsFamilyFuncImpl标示),我们这里看的是linux的
#就是对命令行解析器进行初始化,并针对行为进行进一步的初始化
init_action_parser(action, parser)
#使用命令行参数解析器,对命令行进行解析
(options, args) = parser.parse_args()
# check if only silent key set
default_options = parser.get_default_values()
silent_options = default_options
silent_options.silent = True
if options == silent_options:
options.only_silent = True
else:
options.only_silent = False
# varbose是在init_action_parser方法中设置的,对应的-v参数。是否打印状态信息
# set_varbose方法位于 ambari_commons.logging_utils中,其实就是设置全局变量_VERBOSE的值。
# set verbose
set_verbose(options.verbose)
#接下来就是调用main方法。这里目测是同一个main方法,最大的区别就是有异常处理,有待之后补充TODO--------------------???
if options.verbose:
main(options, args, parser)
else:
try:
main(options, args, parser)
except Exception as e:
print_error_msg("Unexpected {0}: {1}".format((e).__class__.__name__, str(e)) +\
"\nFor more info run ambari-server with -v or --verbose option")
sys.exit(1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def init_action_parser(action, parser):
#定义了参数可用的行为,如ambari-server start,其中start就是行为。这里定义了行为以及对应行为处理操作。
#这些行为的处理方法其实是进一步的初始化参数解析器,会根据对应的行为对参数解析器进行不同的初始化。
action_parser_map = {
SETUP_ACTION: init_setup_parser_options,
SETUP_JCE_ACTION: init_empty_parser_options,
START_ACTION: init_start_parser_options,
STOP_ACTION: init_empty_parser_options,
RESTART_ACTION: init_start_parser_options,
RESET_ACTION: init_reset_parser_options,
STATUS_ACTION: init_empty_parser_options,
UPGRADE_ACTION: init_empty_parser_options,
LDAP_SETUP_ACTION: init_ldap_setup_parser_options,
LDAP_SYNC_ACTION: init_ldap_sync_parser_options,
SET_CURRENT_ACTION: init_set_current_parser_options,
SETUP_SECURITY_ACTION: init_setup_security_parser_options,
REFRESH_STACK_HASH_ACTION: init_empty_parser_options,
BACKUP_ACTION: init_empty_parser_options,
RESTORE_ACTION: init_empty_parser_options,
UPDATE_HOST_NAMES_ACTION: init_empty_parser_options,
CHECK_DATABASE_ACTION: init_empty_parser_options,
ENABLE_STACK_ACTION: init_enable_stack_parser_options,
SETUP_SSO_ACTION: init_setup_sso_options,
DB_PURGE_ACTION: init_db_purge_parser_options,
INSTALL_MPACK_ACTION: init_install_mpack_parser_options,
UNINSTALL_MPACK_ACTION: init_uninstall_mpack_parser_options,
UPGRADE_MPACK_ACTION: init_upgrade_mpack_parser_options,
PAM_SETUP_ACTION: init_pam_setup_parser_options,
KERBEROS_SETUP_ACTION: init_kerberos_setup_parser_options,
}
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose", default=False,
help="Print verbose status messages")
parser.add_option("-s", "--silent",
action="store_true", dest="silent", default=False,
help="Silently accepts default prompt values. For db-cleanup command, silent mode will stop ambari server.")
try:
#根据行为,做进一步的解析器初始化,我们以setup行为为例,那么setup行为的参数解析器对应的是init_setup_parser_options函数。
action_parser_map[action](parser)
except KeyError:
parser.error("Invalid action: " + action)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
def init_setup_parser_options(parser):
database_group = optparse.OptionGroup(parser, 'Database options (command need to include all options)')
database_group.add_option('--database', default=None, help="Database to use embedded|oracle|mysql|mssql|postgres|sqlanywhere", dest="dbms")
database_group.add_option('--databasehost', default=None, help="Hostname of database server", dest="database_host")
database_group.add_option('--databaseport', default=None, help="Database port", dest="database_port")
database_group.add_option('--databasename', default=None, help="Database/Service name or ServiceID",
dest="database_name")
database_group.add_option('--databaseusername', default=None, help="Database user login", dest="database_username")
database_group.add_option('--databasepassword', default=None, help="Database user password", dest="database_password")
parser.add_option_group(database_group)
jdbc_group = optparse.OptionGroup(parser, 'JDBC options (command need to include all options)')
jdbc_group.add_option('--jdbc-driver', default=None, help="Specifies the path to the JDBC driver JAR file or archive " \
"with all required files(jdbc jar, libraries and etc), for the " \
"database type specified with the --jdbc-db option. " \
"Used only with --jdbc-db option. Archive is supported only for" \
" sqlanywhere database." ,
dest="jdbc_driver")
jdbc_group.add_option('--jdbc-db', default=None, help="Specifies the database type [postgres|mysql|mssql|oracle|hsqldb|sqlanywhere] for the " \
"JDBC driver specified with the --jdbc-driver option. Used only with --jdbc-driver option.",
dest="jdbc_db")
parser.add_option_group(jdbc_group)
other_group = optparse.OptionGroup(parser, 'Other options')
other_group.add_option('-j', '--java-home', default=None,
help="Use specified java_home. Must be valid on all hosts")
other_group.add_option('--stack-java-home', dest="stack_java_home", default=None,
help="Use specified java_home for stack services. Must be valid on all hosts")
other_group.add_option('--skip-view-extraction', action="store_true", default=False, help="Skip extraction of system views", dest="skip_view_extraction")
other_group.add_option('--postgresschema', default=None, help="Postgres database schema name",
dest="postgres_schema")
other_group.add_option('--sqla-server-name', default=None, help="SQL Anywhere server name", dest="sqla_server_name")
other_group.add_option('--sidorsname', default="sname", help="Oracle database identifier type, Service ID/Service "
"Name sid|sname", dest="sid_or_sname")
other_group.add_option('--enable-lzo-under-gpl-license', action="store_true", default=False, help="Automatically accepts GPL license", dest="accept_gpl")
# the --master-key option is needed in the event passwords in the ambari.properties file are encrypted
other_group.add_option('--master-key', default=None, help="Master key for encrypting passwords", dest="master_key")
parser.add_option_group(other_group)

这个方法就是为setup的行为,做了进一步的解析器设置。

1
2
3
4
5
_VERBOSE = False
def set_verbose(newVal):
global _VERBOSE
_VERBOSE = newVal

该方法就是设置全局变量_VERBOSE的值。

main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def main(options, args, parser):
init_logging()
# set silent
set_silent(options.silent)
# debug mode
set_debug_mode_from_options(options)
init_debug(options)
#perform checks
options.warnings = []
if len(args) == 0:
print parser.print_help()
parser.error("No action entered")
action_map = create_user_action_map(args, options)
action = args[0]
try:
action_obj = action_map[action]
except KeyError:
parser.error("Invalid action: " + action)
if action == SETUP_ACTION:
if are_cmd_line_db_args_blank(options):
options.must_set_database_options = True
elif not are_cmd_line_db_args_valid(options):
parser.error('All database options should be set. Please see help for the options.')
else:
options.must_set_database_options = False
#correct database
fix_database_options(options, parser)
matches = 0
for args_number_required in action_obj.possible_args_numbers:
matches += int(len(args) == args_number_required)
if matches == 0:
print parser.print_help()
possible_args = ' or '.join(str(x) for x in action_obj.possible_args_numbers)
parser.error("Invalid number of arguments. Entered: " + str(len(args)) + ", required: " + possible_args)
options.exit_message = "Ambari Server '%s' completed successfully." % action
options.exit_code = None
try:
if action in _action_option_dependence_map:
required, optional = _action_option_dependence_map[action]
for opt_str, opt_dest in required:
if hasattr(options, opt_dest) and getattr(options, opt_dest) is None:
print "Missing option {0} for action {1}".format(opt_str, action)
print_action_arguments_help(action)
print "Run ambari-server.py --help to see detailed description of each option"
raise FatalException(1, "Missing option")
action_obj.execute()
if action_obj.need_restart:
pstatus, pid = is_server_runing()
if pstatus:
print 'NOTE: Restart Ambari Server to apply changes' + \
' ("ambari-server restart|stop+start")'
if options.warnings:
for warning in options.warnings:
print_warning_msg(warning)
pass
options.exit_message = "Ambari Server '%s' completed with warnings." % action
pass
except FatalException as e:
if e.reason is not None:
print_error_msg("Exiting with exit code {0}. \nREASON: {1}".format(e.code, e.reason))
logger.exception(str(e))
sys.exit(e.code)
except NonFatalException as e:
options.exit_message = "Ambari Server '%s' completed with warnings." % action
if e.reason is not None:
print_warning_msg(e.reason)
if options.exit_message is not None:
print options.exit_message
if options.exit_code is not None: # not all actions may return a system exit code
sys.exit(options.exit_code)

ambari_doc

发表于 2018-08-20   |   分类于 Ambari

Stacks and Services

Introduction

Ambari支持Stack的概念,并且将服务组合在一个Stack定义中。通过堆栈的作用,Ambari有统一定义的安装接口,管理并监控一组服务,并且引入了Stacks+Servides来提供延伸。
从Ambari2.3开始,还支持Extension的概念,并将自定义服务组合在一个Extension定义中。

Terminology

Term Description
Stack 定义了一组服务,并且这些服务从这里获取软件包。一个Stack能够有一个或多个版本,并且每个版本可以是活跃/不活跃的。例如, Stack=”HDP-1.3.3”。
Extension 定义了一组自定义服务,这些自定义服务可以被添加到一个stack版本中。一个Extension能够有一个或多个版本。
Service 定义Components(MASTER、SLAVE、CLIENT)组成了Service。如,Service=”HDFS”。
Component 每个Component遵循确切的生命周期(start、stop、install等)。例如:Service=”HDFS”包含的组件有:”NameNode(MASTER)”、”Secondary NameNode(MASTER)”、”DataNode(SLAVE)”和”HDFS Client(CLIENT)”

Overview

Background

Stack的定义可以在源码结构的/ambari-server/src/main/resources/stacks中找到。在你安装了Ambari服务之后,Stack的定义可以在/var/lib/ambari-server/resources/stacks中找到。

Structure

Stack定义的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|_ stacks
|_ <stack_name>
|_ <stack_version>
metainfo.xml
|_ hooks
|_ repos
repoinfo.xml
|_ services
|_ <service_name>
metainfo.xml
metrics.json
|_ configuration
{configuration files}
|_ package
{files, scripts, templates}

Defining a Service and Components

Service中的metainfo.xml描述了这个service、这个service的Components以及管理执行命令的脚本。服务的一个组件只能是MASTER、SLAVE或CLIENT三种类型中的一个。组件的类型用来告诉Ambari管理和监控这个组件的默认脚本。
对于每个Component,用来指从合适执行脚本。这个Component必须支持的一组默认命令。

| Component Category | Default Lifecycle Commands |
| MASTER | install, start, stop, configure, status |
| SLAVE | install, start, stop, configure, status |
| CLIENT | install, configure, status |

Ambari支持用PYTHON写的不同命令。类型用来知道如何执行命令脚本。如果你的Component需要支持多于默认生命周期的命令,你也能够创建自定义命令。
例如,下面的metainfo.xml在YARN服务描述了ResourceManager组件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<component>
<name>RESOURCEMANAGER</name>
<category>MASTER</category>
<commandScript>
<script>scripts/resourcemanager.py</script>
<scriptType>PYTHON</scriptType>
<timeout>600</timeout>
</commandScript>
<customCommands>
<customCommand>
<name>DECOMMISSION</name>
<commandScript>
<script>scripts/resourcemanager.py</script>
<scriptType>PYTHON</scriptType>
<timeout>600</timeout>
</commandScript>
</customCommand>
</customCommands>
</component>

ResourceManager是一个MASTER组件,这个命令脚本为scripts/resourcemanager.py,这个脚本可以在services/YARN/package目录中找到。这个脚本是使用PYTHON写的,并且这个以python方法的方式实现了默认的生命周期命令。下面是install方法,对应的是默认的INSTALL命令:

1
2
3
4
class Resourcemanager(Script):
def install(self, env):
self.install_packages(env)
self.configure(env)

你还可以看到有一个自定义命令DECOMMISSION,这意味着在python命令脚本中还有一个decommission方法:

1
2
3
4
5
6
7
8
9
def decommission(self, env):
import params
...
Execute(yarn_refresh_cmd,
user=yarn_user
)
pass

Using Stack Inheritance

Stacks能够从其他Stack进行继承,以便共享命令脚本和配置。通过如下方式降低了代码的重复:

为子Stack定义了repositories。
在子Stack中添加新的Service(不是在父Stack中)。
重写父级Services的命令脚本。
重写父级Services的配置。

例如:HDP 2.1 Stack继承了HDP 2.0.6 Stack,因此只需要在Stack定义中对HDP 2.1 Stack中适当的修改。这个extension在metainfo.xml中对HDP 2.1 Stack进行定义。

1
2
3
4
5
6
<metainfo>
<versions>
<active>true</active>
</versions>
<extends>2.0.6</extends>
</metainfo>

Example: Implementing a Custom Service

在这个例子中,我们将创建一个名为”SAMPLESRV”的自定义service,并将它添加到已有的Stack定义中。这个service含有 MASTER、SLAVE和CLIENT组件。

Create and Add the Service

1、在Ambari server上,跳转到/var/lib/ambari-server/resources/stacks/HDP/2.0.6/services目录。在这个例子中,我们将浏览到HDP 2.0 stack定义。

1
cd /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services

2、创建一个目录:/var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/SAMPLESRV。它用来包含SAMPLESEV的service的定义。

1
2
mkdir /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/SAMPLESRV
cd /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/SAMPLESRV

3、跳转到新创建的SAMPLESRV目录,创建metainfo.xml文件,这个文件用来描述新的service。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?xml version="1.0"?>
<metainfo>
<schemaVersion>2.0</schemaVersion>
<services>
<service>
<name>SAMPLESRV</name>
<displayName>New Sample Service</displayName>
<comment>A New Sample Service</comment>
<version>1.0.0</version>
<components>
<component>
<name>SAMPLESRV_MASTER</name>
<displayName>Sample Srv Master</displayName>
<category>MASTER</category>
<cardinality>1</cardinality>
<commandScript>
<script>scripts/master.py</script>
<scriptType>PYTHON</scriptType>
<timeout>600</timeout>
</commandScript>
</component>
<component>
<name>SAMPLESRV_SLAVE</name>
<displayName>Sample Srv Slave</displayName>
<category>SLAVE</category>
<cardinality>1+</cardinality>
<commandScript>
<script>scripts/slave.py</script>
<scriptType>PYTHON</scriptType>
<timeout>600</timeout>
</commandScript>
</component>
<component>
<name>SAMPLESRV_CLIENT</name>
<displayName>Sample Srv Client</displayName>
<category>CLIENT</category>
<cardinality>1+</cardinality>
<commandScript>
<script>scripts/sample_client.py</script>
<scriptType>PYTHON</scriptType>
<timeout>600</timeout>
</commandScript>
</component>
</components>
<osSpecifics>
<osSpecific>
<osFamily>any</osFamily> <!-- note: use osType rather than osFamily for Ambari 1.5.0 and 1.5.1 -->
</osSpecific>
</osSpecifics>
</service>
</services>
</metainfo>

4、在上面,我们的service名为“SAMPLESRV”,它包含:

一个名为”SAMPLESRV_MASTER”的MASTER的组件。
一个名为“SLAVE”的SLAVE的组件。
一个名为“CLIENT”的CLIENT的组件。

5、接下来,创建命令脚本。为脚本创建目录/var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/SAMPLESRV/package/scripts,并在这个目录中定义service的metainfo。

1
2
mkdir -p /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/SAMPLESRV/package/scripts
cd /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/SAMPLESRV/package/scripts

跳转到script目录,并创建.py命令脚本文件。
master.py文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import sys
from resource_management import *
class Master(Script):
def install(self, env):
print 'Install the Sample Srv Master';
def stop(self, env):
print 'Stop the Sample Srv Master';
def start(self, env):
print 'Start the Sample Srv Master';
def status(self, env):
print 'Status of the Sample Srv Master';
def configure(self, env):
print 'Configure the Sample Srv Master';
if __name__ == "__main__":
Master().execute()

slave.py文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import sys
from resource_management import *
class Slave(Script):
def install(self, env):
print 'Install the Sample Srv Slave';
def stop(self, env):
print 'Stop the Sample Srv Slave';
def start(self, env):
print 'Start the Sample Srv Slave';
def status(self, env):
print 'Status of the Sample Srv Slave';
def configure(self, env):
print 'Configure the Sample Srv Slave';
if __name__ == "__main__":
Slave().execute()

sample_client.py文件:

1
2
3
4
5
6
7
8
9
import sys
from resource_management import *
class SampleClient(Script):
def install(self, env):
print 'Install the Sample Srv Client';
def configure(self, env):
print 'Configure the Sample Srv Client';
if __name__ == "__main__":
SampleClient().execute()

7、现在重启Ambari Server以便新的service定义分发到集群的所有Agents。

1
ambari-server restart

Install the Service(Via Ambari WEb “Add Service”)

从Ambari 1.7.0开始,可以通过Ambari Web来添加自定义服务。

在Ambari web页面,跳转到services,并点击左边service导航部分的Action按钮。
点击“Add Services”。你将看到一个包含“My Sample Service”(在metainfo.xml文件中定义service的)的选项。
选择“My Sample service”并点击下一步。
选择“Sample Srv Master”并点击下一步。
选择host来安装”Sample Srv Client”,并点击下一步。
一旦完成,”My Sample Service”将会在在service导航区可用。
如果你想要为所有host添加“Sample Srv Client”,你可以跳转到Host,并到航道指定的host并点击”+ Add”。

Example: Implementing a Custom Client-only Service

在这个例子中,我们将创建一个名为“TESTSRV”的自定义service,添加到已经存在的Stack定义上,并Ambari APIs来安装/配置这个service。这个service是一个CLIENT,因此它有两个命令:install和configure。

Create and Add the Service

1、在Ambari Service上,跳转到 /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services目录。在这个例子中,我们将跳转到HDP2.0 Stack 定义中。

1
cd /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services

2、创建一个名为/var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTSRV的目录,它用来包含TESTSRV的service定义。

1
2
mkdir /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTSRV
cd /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTSRV

3、跳转到新创建的TESTSRV目录,创建一个名为metainfo.xml的文件用来描述新的service。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0"?>
<metainfo>
<schemaVersion>2.0</schemaVersion>
<services>
<service>
<name>TESTSRV</name>
<displayName>New Test Service</displayName>
<comment>A New Test Service</comment>
<version>0.1.0</version>
<components>
<component>
<name>TEST_CLIENT</name>
<displayName>New Test Client</displayName>
<category>CLIENT</category>
<cardinality>1+</cardinality>
<commandScript>
<script>scripts/test_client.py</script>
<scriptType>PYTHON</scriptType>
<timeout>600</timeout>
</commandScript>
<customCommands>
<customCommand>
<name>SOMETHINGCUSTOM</name>
<commandScript>
<script>scripts/test_client.py</script>
<scriptType>PYTHON</scriptType>
<timeout>600</timeout>
</commandScript>
</customCommand>
</customCommands>
</component>
</components>
<osSpecifics>
<osSpecific>
<osFamily>any</osFamily> <!-- note: use osType rather than osFamily for Ambari 1.5.0 and 1.5.1 -->
</osSpecific>
</osSpecifics>
</service>
</services>
</metainfo>

4、在上面,我们的service名为“TESTSRV”,并且它包含了一个名为“TEST_CLIENT”的组件,这个组件属于CLIENT类型。这个client通过命令脚本scripts/test_client.py来管理。接下来创建命令脚本。
5、为命令脚本创建目录/var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTSRV/package/scripts。

1
2
mkdir -p /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTSRV/package/scripts
cd /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTSRV/package/scripts

6、跳转到scripts目录,并创建test_client.py文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
import sys
from resource_management import *
class TestClient(Script):
def install(self, env):
print 'Install the client';
def configure(self, env):
print 'Configure the client';
def somethingcustom(self, env):
print 'Something custom';
if __name__ == "__main__":
TestClient().execute()

7、现在,重启Ambari Server,将新的service定义分发到集群的所有Agents。

1
ambari-server restart

Install the Service(Via the Ambari REST API)

1、将Service添加到Cluster。

1
2
3
4
5
6
7
8
9
POST
/api/v1/clusters/MyCluster/services
{
"ServiceInfo": {
"service_name":"TESTSRV"
}
}

2、添加组件到Service。这个例子中,添加TEST_CLIENT到TESTSRV。

1
2
POST
/api/v1/clusters/MyCluster/services/TESTSRV/components/TEST_CLIENT

3、将组件添加到所有host。例如,要安装到c6402.ambari.apache.org和c6403.ambari.apache.org上,首先使用POST在这些主机上创建host_component源

1
2
3
4
5
POST
/api/v1/clusters/MyCluster/hosts/c6402.ambari.apache.org/host_components/TEST_CLIENT
POST
/api/v1/clusters/MyCluster/hosts/c6403.ambari.apache.org/host_components/TEST_CLIENT

4、现在,需要在所有主机上安装组件。在这个命令中,你指导Ambari来安装与这个service有关的所有组件。调用每个主机上命令脚本的install方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT
/api/v1/clusters/MyCluster/services/TESTSRV
{
"RequestInfo": {
"context": "Install Test Srv Client"
},
"Body": {
"ServiceInfo": {
"state": "INSTALLED"
}
}
}

除了同时安装所有的组件,你还可以只在某一台机器上装组件。在这个例子中,我们在c6402.ambari.apache.org上安装TEST_CLIENT:

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT
/api/v1/clusters/MyCluster/hosts/c6402.ambari.apache.org/host_components/TEST_CLIENT
{
"RequestInfo": {
"context":"Install Test Srv Client"
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}

6、使用如下信息配置主机上的client。这将最终调用命令脚本中的configure()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
POST
/api/v1/clusters/MyCluster/requests
{
"RequestInfo" : {
"command" : "CONFIGURE",
"context" : "Config Test Srv Client"
},
"Requests/resource_filters": [{
"service_name" : "TESTSRV",
"component_name" : "TEST_CLIENT",
"hosts" : "c6403.ambari.apache.org"
}]
}

7、如果你想看哪些主机完成了安装。

1
2
GET
/api/v1/clusters/MyCluster/components/TEST_CLIENT

Install the Service(via Ambari Web “Add Services”)

1、在Ambari Web界面,跳转到Services并点击左侧Service导航区的Actions按钮。
2、点击“Add Services”。你将看到一个“My Test Service”的选项(在service的metainfo.xml文件中services的中定义)。
3、选择“My Test Service”并点击下一步。
4、选择主机来安装“New Test Client”并点击下一步。
5、一旦完成,“My Test Service”将在Service导航区中可用。
6、如果你想要在其他主机上添加“New Test Client”,你可以跳转到Hosts,并指定主机后点击“+ Add”。

Example: Implementing a Custom Client-only Service (with Configs)

在这个例子中,我们将创建一个名为“TESTCONFIGSRV”的自定义service,并将其添加到已有的Stack定义上。这个service是一个CLIENT类型,因此它有两个命令:install和configure。service还包含”test-confg”配置类型。

Create and Add the Service to Stack

1、在Ambari Server上,跳转到/var/lib/ambari-server/resources/stacks/HDP/2.0.6/services目录。在这个例子中,我们将跳转到HDP 2.0 Stack定义。

1
cd /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services

2、创建名为/var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTCONFIGSRV的目录,它包含了为TESTCONFIGSRV定义的service。

1
2
mkdir /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTCONFIGSRV
cd /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTCONFIGSRV

3、跳转到刚刚创建的TESTCONFIGSRV目录,创建一个metainfo.xml文件,这个文件描述了这个新的service。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0"?>
<metainfo>
<schemaVersion>2.0</schemaVersion>
<services>
<service>
<name>TESTCONFIGSRV</name>
<displayName>New Test Config Service</displayName>
<comment>A New Test Config Service</comment>
<version>0.1.0</version>
<components>
<component>
<name>TESTCONFIG_CLIENT</name>
<displayName>New Test Config Client</displayName>
<category>CLIENT</category>
<cardinality>1+</cardinality>
<commandScript>
<script>scripts/test_client.py</script>
<scriptType>PYTHON</scriptType>
<timeout>600</timeout>
</commandScript>
</component>
</components>
<osSpecifics>
<osSpecific>
<osFamily>any</osFamily> <!-- note: use osType rather than osFamily for Ambari 1.5.0 and 1.5.1 -->
</osSpecific>
</osSpecifics>
</service>
</services>
</metainfo>

4、在上面,我的service的名为“TESTCONFIGSRV”,并且它包含一个名为“TESTCONFIG_CLIENT”组件,这个组件的类型为“CLINT”。这个client通过命令脚本scripts/test_client.py来管理。接下来创建这个命令脚本。
5、为命令脚本创建目录/var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTCONFIGSRV/package/scripts,这个脚本在ervice metainfo 中指定。

1
2
mkdir -p /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTCONFIGSRV/package/scripts
cd /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTCONFIGSRV/package/scripts

6、调转到scripts目录,并创建test_client.py文件。例如:

1
2
3
4
5
6
7
8
9
10
11
import sys
from resource_management import *
class TestClient(Script):
def install(self, env):
print 'Install the config client';
def configure(self, env):
print 'Configure the config client';
if __name__ == "__main__":
TestClient().execute()

7、现在,为这个service定义配置类型。为配置目录/var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTCONFIGSRV/configuration创建一个目录。

1
2
mkdir -p /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTCONFIGSRV/configuration
cd /var/lib/ambari-server/resources/stacks/HDP/2.0.6/services/TESTCONFIGSRV/configuration

8、跳转到配置目录,并创建test-config.xml文件。例如:

1
2
3
4
5
6
7
8
9
10
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>some.test.property</name>
<value>this.is.the.default.value</value>
<description>This is a kool description.</description>
</property>
</configuration>

9、现在,重启Ambari Server以便将service分发到集群中的所有Agent上。

1
ambari-server restart

How-To Define Stacks and Services

Ambari管理的Services在Ambari的stacks文件夹中定义。
要定义自己的services和stacks并被Ambari管理,请遵循如下步骤。
上面的create your custom stack and service的例子也可以学习。
stack是services的集合。一个stack可以定义多个版本,每个版本有自己的一组service。Ambari中的Stacks在 ambari-server/src/main/resources/stacks 文件夹中定义,这个文件夹可以在安装之后的/var/lib/ambari-server/resources/stacks目录找到。
被stack管理的servces能够在 ambari-server/src/main/resources/common-services 或 ambari-server/src/main/resources/stacks 文件夹中定义。这些文件对应安装后的目录分别为:/var/lib/ambari-server/resources/common-services 或 /var/lib/ambari-server/resources/stacks。

Question : 什么时候在 common-services 或 stacks 文件夹中定义service呢
当service可能被用于多个stacks时,我们将在common-services文件夹中定义service。例如,几乎所有的stacks都需要HDFS service,因此将它定义在common-services而不是在每个stack中定义是值得推荐的。同样,如果一个service从不会被共享,它能够被定义在stack文件夹中。基本上来说stacks文件夹中定义services是不推荐的,而推荐将service定义在common-services中。

Define Service

下面展示了如何在common-services文件夹中定义一个service。在stacks文件夹中定义services时,也可以使用相同的方法,具体会在定义stack章节介绍。

Services必须提供主要的metainfo.xml文件,它提供了关于这个service的重要元数据。
除此之外,其他文件提供了关于server的更多信息。关于这些文件的更多信息将在下面提供。

一个service也可能继承自它的之前版本或common services。关于继承的更多信息,请查看Service Inheritance。

metainfo.xml

在metainfo.xml服务描述符中,首先被定义的是service和它的components。
完整的参考文献可以在Writing metainfo.xml中找到。
值得推荐的metainfo.xml实现是HDFS metainfo.xml。

Question : 是否可以在同一个metainfo.xml中定义多个services?
可以。尽管可以,但是强烈拒绝在相同的service文件夹中定义多个services。
YARN和MapReduces2就被一起定义在YARN文件夹中。它的metainfo.xml同时定义了两个services。

Scripts

对于components的定义,我们需要提供脚本来处理service的不同阶段以及组件的生命周期。
管理service和components的脚本在metainfo.xml(HDFS)中指定。
每个脚本应该继承Script类,这个父类提供了有用的方法。例如:NameNode script。


这些脚本应该在//package/script文件夹中提供。

Folder Purpose
package/script 包含了由Ambari执行的脚本。这些脚本使用正确的环境被加载到执行路径中。例如:HDFS
package/files 包含被上面脚本使用的文件。一般是其他一些作为独立进程执行的脚本(bash、python等)。例如:checkWebUI.py在HDFS检查中运行,用来确定Journal Node是否活跃。
package/tmplates 上述脚本在管理节点上生成的临时文件。一般是service需要操作的配置文件。例如:exclude_hosts_list.j2 ,被脚本使用来产生/etc/hadoop/conf/dfs.exclude。

Python

Ambari默认支持python脚本来管理service和components。
component脚本应该继承resource_management.Script类并提供component的生命周期所需的方法。
参考how to create custom stack页面,MASTER、SLAVE和CLIENT组件贯穿生命周期所需的方法如下:
master.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import sys
from resource_management import Script
class Master(Script):
def install(self, env):
print 'Install the Sample Srv Master';
def stop(self, env):
print 'Stop the Sample Srv Master';
def start(self, env):
print 'Start the Sample Srv Master';
def status(self, env):
print 'Status of the Sample Srv Master';
def configure(self, env):
print 'Configure the Sample Srv Master';
if __name__ == "__main__":
Master().execute()

slave.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import sys
from resource_management import Script
class Slave(Script):
def install(self, env):
print 'Install the Sample Srv Slave';
def stop(self, env):
print 'Stop the Sample Srv Slave';
def start(self, env):
print 'Start the Sample Srv Slave';
def status(self, env):
print 'Status of the Sample Srv Slave';
def configure(self, env):
print 'Configure the Sample Srv Slave';
if __name__ == "__main__":
Slave().execute()

client.py

1
2
3
4
5
6
7
8
9
import sys
from resource_management import Script
class SampleClient(Script):
def install(self, env):
print 'Install the Sample Srv Client';
def configure(self, env):
print 'Configure the Sample Srv Client';
if __name__ == "__main__":
SampleClient().execute()

Ambari提供了有用的Python库,以便在以下方面提供写servier脚本的帮助。对于这些库的完整介绍,请通过Ambari Python Libraries页面访问。

resource_management
ambari_commons
ambari_simplejson

OS Variant Script

如果service支持多个操作系统,则需要根据不同的操作系统由独立的脚本,可以继承resource_management.Script类并使用不同的@OSFamilyImpl()注解。
这能够区分组件的不同操作系统的方法。
例如: NameNode default script, NameNode Windows script

Examples
NameNode Start, Stop
DataNode Start and Stop
HDFS configurations persistence

Custom Actions

有些时候,services需要执行一些行为,这些行为不同于Ambari提供的默认行为(install、start、stop、configure等)。
services能够定义一些action,并将这些action在UI中展示给用户,因此这些行为能够方便执行。
举例说明,如HDFS实现的Rebalance HDFS自定义行为。

Stack Changes

1、在metainfo.xml中component的部分中定义指定义命令。
2、在metainfo.xml相关的脚本中,用相同的名字实现方法,来作为自定义命令。
a)如果自定义命令不含有操作系统变量,它可以在同一个继承resource_management.Script的类中被实现。
b)如果含有操作系统变量,每个类中的不同方法可以通过@OsFamilyImpl(os_family=…)来实现。Default rebalancehdfs, Windows rebalancehdfs。
这将提供在安装了service的被管理的主机上以后端方式运行脚本的能力。

UI Changes

在host页面上查看自定义action不需要修改UI。
action将展示在主机组件的action列表中。任何master-component action将自动展示在service的action菜单上。
当action被点击后,将自动产生POST调用来触发上面定义的脚本。

Question : 如何为UI中的自定义action提供自己的标签和图标?
在Ambari UI中,使用自定义图标和名称,添加你的component action到App.HostComponentActionMap对象。

Configuration

service的配置文件应当位于默认的configuration文件夹中。
如果使用了不同的文件夹,metainfo.xml中的,可以用来指明使用的文件夹。
metainfo.xml中需要考虑配置的重要部分是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<?xml version="1.0"?>
<metainfo>
<schemaVersion>2.0</schemaVersion>
<services>
<service>
<name>HDFS</name>
<displayName>HDFS</displayName>
<comment>Apache Hadoop Distributed File System</comment>
<version>2.1.0.2.0</version>
<components>
...
<component>
<name>HDFS_CLIENT</name>
...
<configFiles>
<configFile>
<type>xml</type>
<fileName>hdfs-site.xml</fileName>
<dictionaryName>hdfs-site</dictionaryName>
</configFile>
<configFile>
<type>xml</type>
<fileName>core-site.xml</fileName>
<dictionaryName>core-site</dictionaryName>
</configFile>
<configFile>
<type>env</type>
<fileName>log4j.properties</fileName>
<dictionaryName>hdfs-log4j,yarn-log4j</dictionaryName>
</configFile>
<configFile>
<type>env</type>
<fileName>hadoop-env.sh</fileName>
<dictionaryName>hadoop-env</dictionaryName>
</configFile>
</configFiles>
...
<configuration-dependencies>
<config-type>core-site</config-type>
<config-type>hdfs-site</config-type>
</configuration-dependencies>
</component>
...
</components>
<configuration-dir>configuration</configuration-dir>
<configuration-dependencies>
<config-type>core-site</config-type>
<config-type>hdfs-site</config-type>
<config-type>hadoop-env</config-type>
<config-type>hadoop-policy</config-type>
<config-type>hdfs-log4j</config-type>
<config-type>ranger-hdfs-plugin-properties</config-type>
<config-type>ssl-client</config-type>
<config-type>ssl-server</config-type>
<config-type>ranger-hdfs-audit</config-type>
<config-type>ranger-hdfs-policymgr-ssl</config-type>
<config-type>ranger-hdfs-security</config-type>
<config-type>ams-ssl-client</config-type>
</configuration-dependencies>
</service>
</services>
</metainfo>

config-type - 字符串表示的一组配置。例如:core-site, hdfs-site, yarn-site等。当配置在Ambari中保存,它们被固化到一个config-type版本中,而且这个版本是不可变的。如果你更改并保存HDFS core-site配置4次,你将有4个版本的config-type core-site。同样,当一个service的配置被保存时,只有更改的config-type被更新。
configFiles - 列出由处理的配置文件。
configFile - 某种类型的一个配置文件。
type - 基于文件内容的不同指定文件的类型
xml - Hadoop中友好的方式,XML文件。
env - 通常用于将内容值作为模版的脚本。模版具有配置标签,并且它的值在运行时生成。
properties - 生成属性文件,每条属性的格式为key=value。
dictionaryName - 配置类型的名字。
configuration-dependencies - 列出component或service所依赖的config-type的列表。
configuration-dir - configFiles所指定的文件所处的目录。可选的,默认为configuration。

Adding new configs in a config-type

向config-type中添加一个配置项时有很多不同的参数可选。它们在这里被全面介绍。

UI - Categories

上面的定义的配置在service的配置页面显示。
要自定义分类并在UI中对配置进行排序,需要更新下面的文件。
Create Category - 更新 ambari-web/app/models/stack_service.js 文件,用来添加自己的service,以及你的新分类。
Use Category - 要将配置置于某种分类中,并指定配置的顺序,将配置添加到 ambari-web/app/data/HDP2/site_properties.js 文件中。在这个文件中,可以指定需要使用到分类,以及配置的索引。ambari-web/app/data中的stack文件夹时分层的且继承自前一个版本。片段中的配置属性在这里定义。例如 Hive Categories, Tez Categories

UI - Enhanced Configs

Enhanced Config特性使得服务提供者能够定制他们自己的service配置,并确定哪些配置主要显示给用户,而不需要修改任何UI代码。自定义包括为service提供友好的布局,更好的控制(sliders, combos, lists, toggles, spinners, etc)、更好的验证(minimum, maximum, enums)、自动的单位转换(MB, GB, seconds, milliseconds, etc.)、配置依赖以及默认值的动态推荐。
servier提供者能够达成上面所有的,只需要在stacks文件夹中修改它们service的定义。
在Enhanced Configs页面中查看更多。

Alerts

通过提供一个alert.js文件,每个service都能够定义Ambari应该跟踪的警报。
在Alerts wiki page页面能够读到更多关于报警框架的信息,而alerts.json文件的格式在Alerts definition document中可以了解到。

Kerberos

Ambari能够对一个集群启用或禁用Kerberos。要通知Ambari服务及其组件使用的身份和配置,每个服务需要提供一个kerberos.json文件。
在Automated Kerberizationwiki页面可以读到关于Kerberos的支持的信息,还可以在Kerberos Descriptor documentation中得到Kerberos的描述信息。

Metrics

对于Hadoop和Ambari管理的集群,Ambari提供了Ambari Metrics System服务,用来收集、聚合系统的metrics。
每个service可以定义哪些metrics能够被AMS收集,通过metrics.json文件来定义。你可以在Stack Defined Metrics页面中得到关于metrics.json格式的信息。

Quick Links

一个service通过向一个文本添加metainfo来实现向Ambari web UI中添加一个快速链接的列表,添加数据的文本遵循一个预定义JSON格式。Ambari server解析quicklink JSON文件,并将它的内容展示在UI。因此,Ambari web UI能够根据这些信息计算quick link URLs,并相应的填充quicklink的下拉列表。
关于quick link的JSON文件的设计,可以参看Quick Links页面。

Widgets

每个service都可以通过定一个widgets.json文件来定义在service的摘要页面上默认显示哪些widgets和heatmaps。
你可以在Enhanced Service Dashboard页面中看到更多关于widgets描述符的信息。

Role Command Order

从Ambari 2.2开始,每个service通过在service文件夹中包含一个role_rommand_order.json文件来定义自己的role command order。这个service应当只指定它的组件到其他组件之间的关系。换句话说,如果service只包含COMP_X,那么servier应当只列出与COMP_X相关的依赖。如果COMP_X启动,它依赖于NameNode的启动,当NameNode停止时,NameNode应该要等COMP_X先停止,下面的信息将被包含在role command order中:
Example service role_command_order.json

1
2
"COMP_X-START": ["NAMENODE-START"],
"NAMENODE-STOP": ["COMP_X-STOP"]

service的role command order条目将会与stack中定义的role command order合并。例如,因为stack已经依赖NAMENODE_STOP,在上面的例子中,COMP_X-STOP将被添加到NAMENODE-STOP的依赖,此外,COMP_X-START对NAMENODE-START的依赖将作为一个新的依赖项被添加。
对于role command order的更多信息,可以查看Role Command Order章节。

Service Advisor

从Ambari 2.4开始,每个service可以选择定义自己的service advisor,而不是在stack advisor中定义它的配置和布局的细节。这专门用于哪些没有在stack中定义的自定义service。service能够在它的service文件夹中编写一个名为service-advisor.py的Python脚本来提供Service Advisor的能力。这个文件夹可以位于定义service的stack的services目录或者用来定义可继承service的common-services目录。例如:common-services/HAWQ/2.0.0。
与Stack-advisor脚本不同,service-advisor脚本不会自动的继承父级service的service-advisor脚本。service-advisor脚本需要声明来继承它们父级service的service-advisor脚本。下面的代码向你展示了如何引用父级service的service-advisor.py。在这个例子中,它继承了位于resource/stacks中的顶级service-advisor.py。
Sample service-advisor.py file inheritance

1
2
3
4
5
6
7
8
9
10
11
12
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
try:
with open(PARENT_FILE, 'rb') as fp:
service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
except Exception as e:
traceback.print_exc()
print "Failed to load parent"
class HAWQ200ServiceAdvisor(service_advisor.ServiceAdvisor):

与stack advisors类似,service advisor在4个重要概念上提供了信息:

1、推荐集群上service的布局。
2、推荐service配置。
3、验证集群上service的布局。
4、验证service配置。
通过提供的service-advisor.py文件,service能够动态控制上面的每一个。
对于service-advisor脚本来说主要接口是如何调用上面的每一项,以及给它们提供什么数据。

Base service_advisor.py from resources/stacks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class ServiceAdvisor(DefaultStackAdvisor):
"""
Abstract class implemented by all service advisors.
"""
"""
If any components of the service should be colocated with other services,
this is where you should set up that layout. Example:
# colocate HAWQSEGMENT with DATANODE, if no hosts have been allocated for HAWQSEGMENT
hawqSegment = [component for component in serviceComponents if component["StackServiceComponents"]["component_name"] == "HAWQSEGMENT"][0]
if not self.isComponentHostsPopulated(hawqSegment):
for hostName in hostsComponentsMap.keys():
hostComponents = hostsComponentsMap[hostName]
if {"name": "DATANODE"} in hostComponents and {"name": "HAWQSEGMENT"} not in hostComponents:
hostsComponentsMap[hostName].append( { "name": "HAWQSEGMENT" } )
if {"name": "DATANODE"} not in hostComponents and {"name": "HAWQSEGMENT"} in hostComponents:
hostComponents.remove({"name": "HAWQSEGMENT"})
"""
def colocateService(self, hostsComponentsMap, serviceComponents):
pass
"""
Any configuration recommendations for the service should be defined in this function.
This should be similar to any of the recommendXXXXConfigurations functions in the stack_advisor.py
such as recommendYARNConfigurations().
"""
def getServiceConfigurationRecommendations(self, configurations, clusterSummary, services, hosts):
pass
"""
Returns an array of Validation objects about issues with the hostnames to which components are assigned.
This should detect validation issues which are different than those the stack_advisor.py detects.
The default validations are in stack_advisor.py getComponentLayoutValidations function.
"""
def getServiceComponentLayoutValidations(self, services, hosts):
return []
"""
Any configuration validations for the service should be defined in this function.
This should be similar to any of the validateXXXXConfigurations functions in the stack_advisor.py
such as validateHDFSConfigurations.
"""
def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
return []

Examples


  • Service Advisor interface

  • HAWQ 2.0.0 Service Advisor implementation

  • PXF 3.0.0 Service Advisor implementation

Service Upgrade

从Ambari开始,每个service能够在它的service definition中定义它自己的更新。这对哪些不再需要修改stack的upgrade-packs的自定义service,以便它们融合到集群的更新。

每个service能够定义upgrade-packs,upgrade-packs是一些XML文件,它们描述了某个service的更新进程已经这个更新包如何与所有的stack更新包相关联。这些upgrade-pack XML文件在service的upgrades/文件夹中的独立的子文件夹中,这些子文件夹指明了需要扩展的stack版本。测试代码中的一些例子。

Examples


  • Upgrades folder

  • Upgrade-pack XML

service定义的每个upgrade-pack通过一个特定的stack版本,应当匹配service定义的文件名。例如,在测试代码中,HDP 2.2.0有一个名为upgrade_test_15388.xml的upgrade-pack。HDFS service定义了一个extension来扩展那个upgrade packHDP/2.0.5/services/HDFS/upgrades/HDP/2.2.0/upgrade_test_15388.xml。在这个例子中,upgrade-pack定义在HDP/2.0.5的stack中。这个upgrade-pack是HDP/2.2.0的一个扩展,因为他被定义在upgrade/HDP/2.2.0目录中。最终,扩展到upgrad-pack upgrade_test_15388.xml的service的名字与HDP/2.2.0/upgrades中的upgrade-pack的名字匹配。
对于service的文件格式与stack的有很大的相同。target、target-stack和type属性应该和stack的upgrade-pack的信息完全对应。service能够添加自己的前提检测。

General Attributes and Prerequisite Checks

1
2
3
4
5
6
7
<upgrade xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<target>2.4.*</target>
<target-stack>HDP-2.4.0</target-stack>
<type>ROLLING</type>
<prerequisite-checks>
<check>org.apache.ambari.server.checks.FooCheck</check>
</prerequisite-checks>

upgrade-pack的部分,由标签组成,就像stack的upgrade-pack。关键的不同是如何定义这些,使它们与stack的upgrade pack的或其他service的upgrade pack的相关联。在第一个例子中,我们引入了名为PRE_CLUSTER的并为名为FOO的service新增了一个。该项应该在基于标签的HDFS之后的中添加。

Order Section - Add After Group Entry

1
2
3
4
5
6
7
8
9
<order>
<group xsi:type="cluster" name="PRE_CLUSTER" title="Pre {{direction.text.proper}}">
<add-after-group-entry>HDFS</add-after-group-entry>
<execute-stage service="FOO" component="BAR" title="Backup FOO">
<task xsi:type="manual">
<message>Back FOO up.</message>
</task>
</execute-stage>
</group>

同样的语法也可以被用于service检查优先级和group services等。

Order Section - Further Add After Group Entry Examples

1
2
3
4
5
6
7
8
9
10
11
12
13
<group name="SERVICE_CHECK1" title="All Service Checks" xsi:type="service-check">
<add-after-group-entry>ZOOKEEPER</add-after-group-entry>
<priority>
<service>HBASE</service>
</priority>
</group>
<group name="CORE_MASTER" title="Core Masters">
<add-after-group-entry>YARN</add-after-group-entry>
<service name="HBASE">
<component>HBASE_MASTER</component>
</service>
</group>

还可以在stack的upgrade-pack中增加新的group,并将它们排列在其他group之后。在下面的例子中,我们在使用标签的HIVE的group之后增加了一个名为FOO的group。

Order Section - Add After Group

1
2
3
4
5
6
7
8
<group name="FOO" title="Foo">
<add-after-group>HIVE</add-after-group>
<skippable>true</skippable>
<allow-retry>false</allow-retry>
<service name="FOO">
<component>BAR</component>
</service>
</group>

你还可以在同一个中同时创建和。这将会在指定的group不存在的情况下才会创建一个新的group,并且会将他排列在指定的group之后。将会确定它的group的service的内部排序、优先级和执行阶段。

Order Section - Add After Group

1
2
3
4
5
6
7
8
9
<group name="FOO" title="Foo">
<add-after-group>HIVE</add-after-group>
<add-after-group-entry>FOO</add-after-group-entry>
<skippable>true</skippable>
<allow-retry>false</allow-retry>
<service name="FOO2">
<component>BAR2</component>
</service>
</group>

upgrade-pack剩余的部分,与stack的upgrade-pack的相同。

Processing Section

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<processing>
<service name="FOO">
<component name="BAR">
<upgrade>
<task xsi:type="restart-task" />
</upgrade>
</component>
<component name="BAR2">
<upgrade>
<task xsi:type="restart-task" />
</upgrade>
</component>
</service>
</processing>

Define Stack

一个Stack就是一个版本化的service的集合。每个stack就是一个定义在ambari-server/src/main/resource/stacks中的一个文件夹。安装ambari之后,stack的定义则位于ambari-server主机的/var/lib/ambari-server/resources/stacks中。
每个stack文件夹中包含该stack的每个版本的子文件夹。一些stack版本可用,一些不可用。每个stack版本包含一些service,这些service有的继承自common-services,有些在stack版本的services中定义。

Example : HDP stack.HDP-2.4 stack version。

Stack-Version Descriptor

每个Stack-version应当提供一个metainfo.xml(如:HDP-2.3、HDP-2.4 )文件作为描述符,它如下描述了stack-verion:

1
2
3
4
5
6
7
8
<metainfo>
<versions>
<active>true</active>
</versions>
<extends>2.3</extends>
<minJdk>1.7</minJdk>
<maxJdk>1.8</maxJdk>
</metainfo>

versions/active - 当前版本的stack是否还可以用于安装。如果不可用,这个版本在安装的时候将不会在UI中显示。
extends - 当前stack继承的版本。进行继承的stack版本会继承service以及父stack版本的所有方面。
minJdk - stack版本支持的最低JDK版本。在安装向导期间如果被Ambari使用的JDK低于这个版本,用户将被警告。
maxJdk - stack版本支持的最高JDK版本。在安装向导期间,如果被Ambari使用的JDK版本高于这个版本,用户将被警告。

Stack Properties

stack必须包含或继承一个属性字典,属性字典包含两个文件:stack_features.json和stack_tools.json。这个字典是在Ambari 2.4中新增的。
stack_features.json中包含了一个features的列表,这个列表指定了哪些版本的stack包含这些特性。
特性列表由特定的Ambari版本所确定。特定Ambari版本的详细列表能够在HDP/2.0.6/properties/stack_features.json中找到。每个feature由name、description以及特性所支持stack的最高版本和最低版本来构成。

1
2
3
4
5
6
7
8
9
10
{
"stack_features": [
{
"name": "snappy",
"description": "Snappy compressor/decompressor support",
"min_version": "2.0.0.0",
"max_version": "2.2.0.0"
},
...
}

stack_tools.json包含了stack_selector和conf_selector这两个工具对应的名称以及安装位置。
任何自定义的stack必须包含这两个JSON文件。更多的信息请查看Stack Properties的wiki页面。

Services

每个stack版本中都包含services,这些services要么是引用的common-services中的,要么是在stack版本中services文件夹下定义的。
common-services中定义的services能够被多个stack共享。如果他们不会被共享,那么他们可以定义在stack版本中。

Reference common-services

要引用common-services中的一个service,service描述文件需要使用项。(例如: HDFS in HDP-2.0.6)

1
2
3
4
5
6
7
8
9
<metainfo>
<schemaVersion>2.0</schemaVersion>
<services>
<service>
<name>HDFS</name>
<extends>common-services/HDFS/2.1.0.2.0</extends>
</service>
</services>
</metainfo>

Define Service

与common-services中定义的services格式相同,可以子啊services文件夹中定义新的service。
Examples:


  • HDFS in BIGTOP-0.8

  • GlusterFs in HDP-2.3.CusterFs

Extend Service

当一个版本继承另外一个版本时,它继承父级service的所有细节。它也可以自由的重写或删除继承的service定义的任何部分。
Examples:


  • HDP-2.3/HDFS - 添加NFS_GATEWAY组件,更新service版本和OS特定包

  • HDP-2.2/Storm - 删除了STORM_REST_API组件,更新service版本和OS特定包

  • HDP-2.3/YARN - 从capacity-scheduler.mxl中删除YARN node-lable配置

  • HDP-2.3/Kafka - 增加Kafka Broker进程告警

Role Command Order

Role是Component(如:NAMENODE、DATANODE、RESOURCEMANAGER、HBASE_MASTER等)的另一个名称。
顾名思义,它可以告诉Amberi在你stack中定义的component执行命令的顺序。
例如:”ZooKeeper Server 应当在启动NameNode之前启动”。“HBase Master应当在NameNode和DataNode启动之后再启动”。
这可以通过在stack-version文件夹中包含role_command_order.json来具体说明。

Format

以JSON格式指定,这个文件包含一个JSON对象,并且顶级key是section名称或comments。如:HDP-2.0.6。
在每个section对象内部,key描述了它对应的component的行为,value列出当前component-action之前应当完成的component-action。
Structure of role_command_order.json

1
2
3
4
5
6
7
8
9
10
11
{
"_comment": "Section 1 comment",
"section_name_1": {
"_comment": "Section containing role command orders",
"<DEPENDENT_COMPONENT_1>-<COMMAND>": ["<DEPENDS_ON_COMPONENT_1>-<COMMAND>", "<DEPENDS_ON_COMPONENT_1>-<COMMAND>"],
"<DEPENDENT_COMPONENT_2>-<COMMAND>": ["<DEPENDS_ON_COMPONENT_3>-<COMMAND>"],
...
},
"_comment": "Next section comment",
...
}

Sections

Ambari只使用了如下的sections:

Section Name When Used
general_deps 适用于所有情况
optional_glusterfs 当集群有GLUSTERFS服务实例时
optional_no_glusterfs 当集群没有GLUSTERFS服务实例时
namenode_optional_ha 当安装了HDFS服务,且有JOURNALNODE组件时
resourcemanager_optional_ha 当安装了YARN服务,且存在多个RESOURCEMANAGER host-components存在时

Commands

Ambari当前支持的命令有:

INSTALL
UNINSTALL
START
RESTART
STOP
EXECUTE
ABORT
UPGRADE
SERVICE_CHECK
CUSTOM_COMMAND
ACTIONEXECUTE

Examples

Role Command Order Explanation
“HIVE_METASTORE-START”:[“MYSQL_SERVER-START”, “NAMENODE-START”] 启动Hive Metastore之前先启动MySQL和NameNode。
“MAPREDUCE_SERVICE_CHECK-SERVICE_CHECK”:[“NODEMANAGER-START”, “RESOURCEMANAGER-START”] MapReduce服务检查需要ResourceManager和NodeManager的启动。
“ZOOKEEPER_SERVER-STOP”:[“HBASE_MASTER-STOP”, “HBASE_REGIONSERVER-STOP”, “METRICS_COLLECTOR-STOP”] 在停止Zookeeper之前,应该先确保HBase Master、Hbase RegionServers和AMS Metrics收集器先停止。

Repositories

通过提供一个repos/repoinfo.xml(如 HDP-2.0.6),每个stack版本可以提供package的库的位置来使用。
repoinfo.xml文件中包含的库根据操作系统进行分组。每个os指定一个库列表,这些库列表会在stack版本安装时展示给用户。
这些库与packages defined in a service’s metainfo.xml配合使用,以便在系统上安装正确的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<reposinfo>
<os family="redhat6">
<repo>
<baseurl>http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.0.6.1</baseurl>
<repoid>HDP-2.0.6</repoid>
<reponame>HDP</reponame>
</repo>
<repo>
<baseurl>http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.17/repos/centos6</baseurl>
<repoid>HDP-UTILS-1.1.0.17</repoid>
<reponame>HDP-UTILS</reponame>
</repo>
</os>
<reposinfo>

baseurl - RPM库的URL,可以在这里找到repoid提供的软件。
repoid - baseurl地址使用的repo id。
reponame - 需要使用的repo的展示名。

Latest Builds

尽管repository基本URL能够对某个特定repo提供更新,但是必须在构建时定义它。当repository变更位置或更新包位于不同网站时,这就会成为一个问题。
对于这样的情况,stack-version能够提供一个JSON文件,来提供要使用的其他repo URL。
例如: HDP-2.3 repoinfo.xml uses file,它指出最新的构建包的repository URL。

json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
...
"HDP-2.3":{
"latest":{
"centos6":"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.3.6.0-3586/",
"centos7":"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos7/2.x/BUILDS/2.3.6.0-3586/",
"debian6":"http://s3.amazonaws.com/dev.hortonworks.com/HDP/debian6/2.x/BUILDS/2.3.6.0-3586/",
"debian7":"http://s3.amazonaws.com/dev.hortonworks.com/HDP/debian7/2.x/BUILDS/2.3.6.0-3586/",
"suse11":"http://s3.amazonaws.com/dev.hortonworks.com/HDP/suse11sp3/2.x/BUILDS/2.3.6.0-3586/",
"ubuntu12":"http://s3.amazonaws.com/dev.hortonworks.com/HDP/ubuntu12/2.x/BUILDS/2.3.6.0-3586/",
"ubuntu14":"http://s3.amazonaws.com/dev.hortonworks.com/HDP/ubuntu14/2.x/BUILDS/2.3.6.0-3586/"
}
},
...
}

Hooks

stack-version会有非常基本且通用的指令,这些指令需要在某个Ambari命令之前或之后运行。
避免将代码在service脚本之间复制并要求用户确认,通过将前置代码和后置代码放到hooks文件夹中,Ambari提供了Hooks的功能。(如:HDP-2.0.6)

Command Sub-Folders

hooks子文件夹的命名模式为”->”。
那意味着子文件夹中的scripts/hook.py文件是在命令之前运行还是之后运行。
Examples:

Sub-Folder Purpose Example
before-START hook脚本,会在stack-version的任何组件启动之前被调用 HDP-2.0.6 1、设置hadoop的日志和pid目录。2、创建javahome的symlink。3、创建/etc/hadoop/conf/topology_script.py脚本
before-INSTALL hook脚本,会在stack-version的任何组件安装之前被调用 HDP-2.0.6 1、在/etc/yum.repos.d中创建repo文件。 2、安装基本包,如curl、unzip等

Ambari当前支持的命令,根据需要可以创建如下的子文件夹

Prefix Command Details
before INSTALL  
before UNINSTALL  
before START  
before RESTART  
before STOP  
after EXECUTE  
after ABORT  
after UPGRADE  
after SERVICE_CHECK &nbps;
after < custom_command> 用户指定的自定义命令,如HDFS指定的DECOMMISSION或REBALANCEHDFS这两个命令。

script/hooks.py脚本应该导入resource_management.libraries.script.hook模块,并继承Hook类。

1
2
3
4
5
6
7
8
from resource_management.libraries.script.hook import Hook
class CustomHook(Hook):
def hook(self, env):
# Do custom work
if __name__ == "__main__":
CustomHook().execute()

Configurations

尽管大多数配置是在service级别设置的,但是也可以有适用于所有servies的配置,以便指示安装了此stack的集群的状态。
例如,像is security enabled?,what user runs smoke tests? 等。
这样的配置可以定义在sstack的configuration文件夹中。它们就像service级配置一样访问。

Stack Advisor

由于每个stack包含多个复杂的service,因此有必要动态确定services的布局以及确定某些配置的值。
stacks在services/目录中编写一个名为stack-advisor.py的Python脚本,使Ambari提供了Stack Advisor的能力。例如:HDP-2.0.6。Stack advisor脚本能够自动继承父级stack版本的stack advisor脚本。这允许较新的stack版本能够改变行为而不会影响之前的版本的行为。
Stack advisor在4个重要概念上提供了信息:

Recommend layout of services on cluster。
Recommend service configurations。
Validate layout of services on cluster。
Validate service configurations。

通过提供stack-advisor.py文件,能够动态的控制上面的每一项。
stack-advisor脚本的主要接口描述了上面每项应当如何调用,以及提供什么数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
class StackAdvisor(object):
"""
Abstract class implemented by all stack advisors. Stack advisors advise on stack specific questions.
Currently stack advisors provide following abilities:
- Recommend where services should be installed in cluster
- Recommend configurations based on host hardware
- Validate user selection of where services are installed on cluster
- Validate user configuration values
Each of the above methods is passed in parameters about services and hosts involved as described below.
@type services: dictionary
@param services: Dictionary containing all information about services selected by the user.
Example: {
"services": [
{
"StackServices": {
"service_name" : "HDFS",
"service_version" : "2.6.0.2.2",
},
"components" : [
{
"StackServiceComponents" : {
"cardinality" : "1+",
"component_category" : "SLAVE",
"component_name" : "DATANODE",
"display_name" : "DataNode",
"service_name" : "HDFS",
"hostnames" : []
},
"dependencies" : []
}, {
"StackServiceComponents" : {
"cardinality" : "1-2",
"component_category" : "MASTER",
"component_name" : "NAMENODE",
"display_name" : "NameNode",
"service_name" : "HDFS",
"hostnames" : []
},
"dependencies" : []
},
...
]
},
...
]
}
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
Example: {
"items": [
{
Hosts: {
"host_name": "c6401.ambari.apache.org",
"public_host_name" : "c6401.ambari.apache.org",
"ip": "192.168.1.101",
"cpu_count" : 1,
"disk_info" : [
{
"available" : "4564632",
"used" : "5230344",
"percent" : "54%",
"size" : "10319160",
"type" : "ext4",
"mountpoint" : "/"
},
{
"available" : "1832436",
"used" : "0",
"percent" : "0%",
"size" : "1832436",
"type" : "tmpfs",
"mountpoint" : "/dev/shm"
}
],
"host_state" : "HEALTHY",
"os_arch" : "x86_64",
"os_type" : "centos6",
"total_mem" : 3664872
}
},
...
]
}
Each of the methods can either return recommendations or validations.
Recommendations are made in a Ambari Blueprints friendly format.
Validations are an array of validation objects.
"""
def recommendComponentLayout(self, services, hosts):
"""
Returns recommendation of which hosts various service components should be installed on.
This function takes as input all details about services being installed, and hosts
they are being installed into, to generate hostname assignments to various components
of each service.
@type services: dictionary
@param services: Dictionary containing all information about services selected by the user.
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
@rtype: dictionary
@return: Layout recommendation of service components on cluster hosts in Ambari Blueprints friendly format.
Example: {
"resources" : [
{
"hosts" : [
"c6402.ambari.apache.org",
"c6401.ambari.apache.org"
],
"services" : [
"HDFS"
],
"recommendations" : {
"blueprint" : {
"host_groups" : [
{
"name" : "host-group-2",
"components" : [
{ "name" : "JOURNALNODE" },
{ "name" : "ZKFC" },
{ "name" : "DATANODE" },
{ "name" : "SECONDARY_NAMENODE" }
]
},
{
"name" : "host-group-1",
"components" :
{ "name" : "HDFS_CLIENT" },
{ "name" : "NAMENODE" },
{ "name" : "JOURNALNODE" },
{ "name" : "ZKFC" },
{ "name" : "DATANODE" }
]
}
]
},
"blueprint_cluster_binding" : {
"host_groups" : [
{
"name" : "host-group-1",
"hosts" : [ { "fqdn" : "c6401.ambari.apache.org" } ]
},
{
"name" : "host-group-2",
"hosts" : [ { "fqdn" : "c6402.ambari.apache.org" } ]
}
]
}
}
}
]
}
"""
pass
def validateComponentLayout(self, services, hosts):
"""
Returns array of Validation issues with service component layout on hosts
This function takes as input all details about services being installed along with
hosts the components are being installed on (hostnames property is populated for
each component).
@type services: dictionary
@param services: Dictionary containing information about services and host layout selected by the user.
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
@rtype: dictionary
@return: Dictionary containing array of validation items
Example: {
"items": [
{
"type" : "host-group",
"level" : "ERROR",
"message" : "NameNode and Secondary NameNode should not be hosted on the same machine",
"component-name" : "NAMENODE",
"host" : "c6401.ambari.apache.org"
},
...
]
}
"""
pass
def recommendConfigurations(self, services, hosts):
"""
Returns recommendation of service configurations based on host-specific layout of components.
This function takes as input all details about services being installed, and hosts
they are being installed into, to recommend host-specific configurations.
@type services: dictionary
@param services: Dictionary containing all information about services and component layout selected by the user.
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
@rtype: dictionary
@return: Layout recommendation of service components on cluster hosts in Ambari Blueprints friendly format.
Example: {
"services": [
"HIVE",
"TEZ",
"YARN"
],
"recommendations": {
"blueprint": {
"host_groups": [],
"configurations": {
"yarn-site": {
"properties": {
"yarn.scheduler.minimum-allocation-mb": "682",
"yarn.scheduler.maximum-allocation-mb": "2048",
"yarn.nodemanager.resource.memory-mb": "2048"
}
},
"tez-site": {
"properties": {
"tez.am.java.opts": "-server -Xmx546m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC",
"tez.am.resource.memory.mb": "682"
}
},
"hive-site": {
"properties": {
"hive.tez.container.size": "682",
"hive.tez.java.opts": "-server -Xmx546m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC",
"hive.auto.convert.join.noconditionaltask.size": "238026752"
}
}
}
},
"blueprint_cluster_binding": {
"host_groups": []
}
},
"hosts": [
"c6401.ambari.apache.org",
"c6402.ambari.apache.org",
"c6403.ambari.apache.org"
]
}
"""
pass
def validateConfigurations(self, services, hosts):
""""
Returns array of Validation issues with configurations provided by user
This function takes as input all details about services being installed along with
configuration values entered by the user. These configurations can be validated against
service requirements, or host hardware to generate validation issues.
@type services: dictionary
@param services: Dictionary containing information about services and user configurations.
@type hosts: dictionary
@param hosts: Dictionary containing all information about hosts in this cluster
@rtype: dictionary
@return: Dictionary containing array of validation items
Example: {
"items": [
{
"config-type": "yarn-site",
"message": "Value is less than the recommended default of 682",
"type": "configuration",
"config-name": "yarn.scheduler.minimum-allocation-mb",
"level": "WARN"
}
]
}
"""
pass

Examples:

Stack Advisor interface
Default Stack Advisor implementation - for all stacks
HDP(2.0.6) Default Stack Advisor implementation
YARN container size calculate
Recommended configurations - HDFS,YARN,MapReduce2, HBase (HDP-2.0.6),HBase (HDP-2.3)
Delete HBase Bucket Cache configs on smaller machines
Specify maximum value for Tez config

Properties

与stack的配置类似,大多属性都是在service级定义,然而也可以在stack-version级别定义全局属性来影响所有的services。
一些例子:stack-selector and conf-selector 或 stack versions certain stack features。这里的大多属性都是在Ambari 2.4版本引入的以影响stack信息参数化和促进common-service代码重用。
这些属性可以定义在stack的properties文件中的.json文件中。
stack属性的更多信息可以在Stack Properties section找到。

Widgets

暂无

Kerberos

之前我们已经在service级别介绍了Kerberos。
stack-version级别定义的Kerberos为所有的service提供了身份描述。

Examples:Smoke test user and SPNEGO user define in HDP-2.0.6

Stack Upgrades

暂无

Writing metainfo.xml

metainfo.xml是Ambari管理的service的定义,它描述了service的内容。它是service定义中最重要的文件。这一章来介绍metainfo.xml中的各个片段。

Structure

不重要的字段使用斜体显示。
描述一个service的顶级字段如下:

Field What is it used for Sample Values
name service的名字。这个名字必须是service所在Stack范围内唯一的。 HDFS
displayName service在UI中显示的名字。 HDFS
version service的版本。名字和版本一起确定了唯一的service。 2.1.0.2.0
components service的组件列表 < check out HDFS metainfo>
osSpecifics 指定service运行所需的操作系统 < check out HDFS metainfo>
commandScript 定义service check脚本 < check out HDFS metainfo>
comment service的简短描述 Apache Hadoop Distributed File System
requiredServices 该服务所需的前置服务 < check out HDFS metainfo>
configuration-dependencies service所需的其他配置文件(这些配置文件本身属于其他service) < check out HDFS metainfo>
restartRequiredAfterRackChange Rack变更后是否必须重启 true / false
configuration-dir 如果配置目录不是默认的configuration,则需要使用该项来指定 -

service/components - 一个service包含多个components。与component有关的字段有:

Field What is it used it Sample Values
name component的名字。 NameNode
dsplayName component的显示名。 NameNode
category component的类型。可选值为MASTER、SLAVE或CLIENT。 -
commandScript 应用的命令。 < check out HDFS metainfo>
cardinality 允许的实例个数。 MASTER一般设置为1-2, SLAVE一般设置为1+
reassignAllowed 是否允许component被重新分配或移动到另外的主机。 true / false
versionAdvertised component是否显示它的版本信息。回滚/升级时使用。 true / false
timelineAppid metrics收集时用来进行区分的id。 HDFS
dependencies component所依赖的其他component列表。 < check out HDFS metainfo>
customCommands 组件的自定义命令,有别与标准命令。 RESTART_LLAP (Check out HIVE metainfo)

service/osSpecifics - 操作系统包的名

Field What is it used for Sample Values
osFamily service对应的操作系统 any => all, amazon2015、redhat6、debian7
packages 部署这个service所需的packages列表 < check out HDFS metainfo>
package/name package的名字(会被yum\apt等命令使用) 如 hadoop-lzo

service/commandScript - service检查的脚本

Field What is it used for Sample Values
script 脚本的相对路径 scripts/service_check.py
scriptType 脚本的类型,当前纸支持PYTHON PYTHON
timeout 命令的超时时间 300

service/component/customCommand - component的自定义命令

name: 自定义命令的名字
commandScript: 实现自定义命令的脚本信息,它包含其他片段。
commandScript/script: 脚本的相对路径
commandScript/scriptType: 脚本的类型,目前只支持PYTHON。
commandScript/timeout: 命令的超时时间。

Sample metainfo.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
<metainfo>
<schemaVersion>2.0</schemaVersion>
<services>
<service>
<name>HBASE</name>
<displayName>HBase</displayName>
<comment>Non-relational distributed database and centralized service for configuration management &amp;
synchronization
</comment>
<version>0.96.0.2.0</version>
<components>
<component>
<name>HBASE_MASTER</name>
<displayName>HBase Master</displayName>
<category>MASTER</category>
<cardinality>1+</cardinality>
<versionAdvertised>true</versionAdvertised>
<timelineAppid>HBASE</timelineAppid>
<dependencies>
<dependency>
<name>HDFS/HDFS_CLIENT</name>
<scope>host</scope>
<auto-deploy>
<enabled>true</enabled>
</auto-deploy>
</dependency>
<dependency>
<name>ZOOKEEPER/ZOOKEEPER_SERVER</name>
<scope>cluster</scope>
<auto-deploy>
<enabled>true</enabled>
<co-locate>HBASE/HBASE_MASTER</co-locate>
</auto-deploy>
</dependency>
</dependencies>
<commandScript>
<script>scripts/hbase_master.py</script>
<scriptType>PYTHON</scriptType>
<timeout>1200</timeout>
</commandScript>
<customCommands>
<customCommand>
<name>DECOMMISSION</name>
<commandScript>
<script>scripts/hbase_master.py</script>
<scriptType>PYTHON</scriptType>
<timeout>600</timeout>
</commandScript>
</customCommand>
</customCommands>
</component>
<component>
<name>HBASE_REGIONSERVER</name>
<displayName>RegionServer</displayName>
<category>SLAVE</category>
<cardinality>1+</cardinality>
<versionAdvertised>true</versionAdvertised>
<timelineAppid>HBASE</timelineAppid>
<commandScript>
<script>scripts/hbase_regionserver.py</script>
<scriptType>PYTHON</scriptType>
</commandScript>
</component>
<component>
<name>HBASE_CLIENT</name>
<displayName>HBase Client</displayName>
<category>CLIENT</category>
<cardinality>1+</cardinality>
<versionAdvertised>true</versionAdvertised>
<commandScript>
<script>scripts/hbase_client.py</script>
<scriptType>PYTHON</scriptType>
</commandScript>
<configFiles>
<configFile>
<type>xml</type>
<fileName>hbase-site.xml</fileName>
<dictionaryName>hbase-site</dictionaryName>
</configFile>
<configFile>
<type>env</type>
<fileName>hbase-env.sh</fileName>
<dictionaryName>hbase-env</dictionaryName>
</configFile>
</configFiles>
</component>
</components>
<osSpecifics>
<osSpecific>
<osFamily>any</osFamily>
<packages>
<package>
<name>hbase</name>
</package>
</packages>
</osSpecific>
</osSpecifics>
<commandScript>
<script>scripts/service_check.py</script>
<scriptType>PYTHON</scriptType>
<timeout>300</timeout>
</commandScript>
<requiredServices>
<service>ZOOKEEPER</service>
<service>HDFS</service>
</requiredServices>
<configuration-dependencies>
<config-type>core-site</config-type>
<config-type>hbase-site</config-type>
<config-type>ranger-hbase-policymgr-ssl</config-type>
<config-type>ranger-hbase-security</config-type>
</configuration-dependencies>
</service>
</services>
</metainfo>

Ambari

发表于 2018-08-13   |   分类于 hadoop

本文用来记录Ambari的学习

Ambari的简单介绍

从Ambari的作用来说,它是用来创建、管理、监控Hadoop生态(例如hadoop、hive、hbase、Sqoop以及Zookeeper)集群的工具。Ambari就是为了让Hadoop已经相关的大数据软件更容易使用的一个工具。Ambari支持的平台组建也越来越多,如流行的Spark、Storm等计算框架,已经资源调度平台YARN等,都可以通过Ambari来轻松部署。

Ambari自称也是一个分布式架构的软件,主要由两部分组成:Ambari Server和Ambari Agent。用户通过Ambari Server来通知Ambari Agent来安装对应的软件;Agent会定时的发送各个机器每个软件模块的状态给Ambari Server,最终这些信息会呈现在Ambari的GUI中,方便用户了解集群中各个模块的状态,并进行维护。

Ambari的架构和工作原理

Ambari Server会读取Stack和Service的配置文件。当用Ambari创建集群的时候,Ambari Server传送Stack和Service的配置文件配以及Service生命周期的控制脚本到Ambari Agent。Agent拿到配置文件后,会下载安装公共资源里的软件包。安装完成后,Ambari Server会通知Agent去启动Service。之后,Ambari Server会定时发送命令道Agent检查Service的状态,Agent上报给Server并显示在Ambari的UI上。
Ambari Server支持其他API,这样能够很容易的扩展或定制Ambari。
如果有安全方面的要求,Ambari支持Kerberos认证的hadoop集群。

Ambari web:用户交互界面,通过HTTP发送使用Rest API与Ambari Server进行交互。
Ambari Server:Ambari服务器,用于和Web、Agent进行交互,并且包含了Agent的所有控制逻辑,Server产生的数据存储在DB中。
Ambari Agent:守护进程,主要包含节点状态与执行结果信息汇报给Server,以及接受Server操作命令的两个消息队列。
Host:安装实际大数据服务组件的物理机器,每台机器都有Ambari Agent和Metrcis Monitor守护进程服务。
Metrics Collector:主要包括将Metrics monitor汇报的监控信息存储到Hbase,以及提供给Ambari Server的查询接口。

Ambari的自定义命令

在Ambari的Stack中,每个Service都有start、stop、status和configure这样的命令,我们称为生命周期的控制命令。Service的每个模块必须实现这几个命令。为了让用户可以更好的控制每个service以及模块,Ambari支持了自定义命令。
具体的自定义命令配置在每个Service的metainfo.xml中。不过不同的模块类型,呈现在GUI的方式是不一样的。当一个service的Master模块增加一个自定义命令时,该命令会显示在该Service的Service Action List中。如果点击这个命令,Ambari Server就会通知Master所在机器的Agent,Agent就会执行该自定义命令的逻辑。当增加一个自定义命令给Slave或Client类型的Component,该命令会呈现在机器的Component页面。在哪个机器的Component页面点击该命令,Ambari Server就会通知对应机器的Agent调用这个自定义的命令接口。

Spark 2.3.1 Spark SQL DataFrames and DatasetsGuide

发表于 2018-08-10   |   分类于 spark 2.3.1

Spark SQL, DataFrames and Dataset Guide

Overview

Spark SQL是一个用于结构化数据处理的Spark模块。与Spark RDD API不同,由Spark SQL提供的这些接口在结构化数据和结构化计算执行方面提供了更多信息。在内部,Spark SQL使用了这个额外信息来执行额外的优化。有几种与Spark SQL交互的方法,包括SQL和Dataset API。当计算一个结果时,相同的计算引擎会被使用,与你执行计算使用的API/语言无关。这种统一意味着开发者能够轻松在那些提供更加原始的方式处理给定转换的不同API之间进行来回切换。
本篇中所有例子使用的样例数据包含在Spark中,并能够使用spark-shell、pyspark shell或sparkR shell来运行。

SQL

Spark SQL的一种用法时执行SQL查询。Spark SQL还能够被用来从Hive实例中读取数据。关于如何配置这个特性,请参考Hive Tables。当在另一种编程语言中执行SQL时,结果会作为一个Dataset/DataFrame来返回。你还能够使用command-line或JDBC/ODBC的方式与SQL接口进行交互。

Datasets and DataFrames

一个Dataset就是一个分布式数据集。Dataset作为一个新接口在Spark 1.6中被添加,它提供了RDD的优点(强类型、能够使用强大的lambda函数)和Spark SQL的优化执行引擎的有点。一个Dataset能够根据JVM对象来构造,然后使用函数转换(map、flatMap、filter)进行操作。Dataset的API在Scala和Java中时可用的。Python还不支持Dataset API。但是因为Python的动态特性,Dataset API的很多优点已经可用了(例如你可以很自然的通过名称来访问某一行的一个字段 row.columnName)。对于R语言也是如此。
一个DataFrame是一个带有列名的数据集。它在概念上等同于关系数据库中的一个表或者一个是R语言或Python语言中data frame,但是底层具更加优化。DataFrame可以根据各种资源进行构建,例如:结构化的数据文件、Hive中的表、外部数据库以及已经存在的RDD。DataFrame API在Scala、Java、Python和R语言中都可用。在Scala和Java中,一个DataFrame相当于一个有很多行的Dataset。在Scala API中,DataFrame相当于一个Dataset[Row]类型。而在Java API中,用户需要使用Dataset来表述一个DataFrame。
在本文中,我们将经常引用Scala/Java由有Row组成的Dataset来表述DataFrame。

Getting Started

Starting Point: SparkSession

Spark中,所有功能的切入点是SparkSession类。要创建一个基本的SparkSession,只需要使用SparkSession.builder()

1
2
3
4
5
6
7
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();

在Spark库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”目录下,查看完整的示例代码。
SparkSession是Spark 2.0的内置功能,用于提供Hive特性,包括用来写HiveQL查询、
访问Hive UDFs已经从Hive表中读取数据。要使用这些特性,你不需要配置Hive。

Creating DataFrames

使用SparkSession,application能够从一个已经存在的RDD、一个Hive表或Spark data sources来创建DataFrame。
作为一个例子,下面的代码根据一个JSON文件中的内容来创建一个DataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

完整的代码,请查看Spark库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”。

Untyped Dataset Operations(aka DataFrame Operations)

在Scala、Java、Python和R语言中,DataFrames针对不同的语言提供不同的结构化数据操作。正如上面提到的,在Spark2.0中,在Scala和Java的API中,DataFrames是以Dataset来表述的。这些操作也被称为“无类型转换”,与强类型转换的Scala/Java Dataset的类型形成对比。
这里,我们展示了使用Dataset进行结构化数据处理的基本示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+

完整的样例代码,查看Spark库的examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java。
在Dataset上能够执行的操作类型列表,可以查看API Document。
除了简单的列引用和计算外,Dataset还有一个丰富的函数库,包括字符串的操作、日期的计算以及常用的数学操作等。完整的列表可以在DataFrame Function Reference找到。

Running SQL Queries Programmatically

SparkSession上的sql函数使application能够执行SQL查询,并返回一个Dataset作为结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

完整的代码,请查看Spark库中的 examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 。

Global Temporary View

在Spark SQL中,临时视图是session范围的,将会伴随着创建它的那个session的终止而消失。如果你想要跨session共享一个临时视图,并让它存活到application终止,你可以创建一个全局临时视图。全局视图与一个名为‘global_temp’的由系统保护的数据库进行绑定,我们必须使用这个特殊的名字来引用它,如:SELECT * FROM global_temp.view1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

完整的代码,请查看“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”。

Creating Datasets

Dataset与RDD类似,不同的是它没有使用Java序列化或Kryo,它们使用了一个特殊的Encoder来序列化对象,以便这些对象的处理或跨网络传输。虽然encoder和标准序列化器都能够将一个对象转换为字节,encoder是动态编码产生的,并且使用一种格式来允许Spark执行很多操作(filtering, sorting 和 hashing),而不需要讲字节反编译为对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);
// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
(MapFunction<Integer, Integer>) value -> value + 1,
integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

完整的示例,请查看 examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 。

Interoperating with RDDs

Spark SQL支持两种不同方法来将存在的RDD转换为Dataset。第一种方法是使用反射来推导包含特殊类型对象的RDD的模式。这种反射的方法代码更加简单,而且如果在你写Spark application时已经知道了模式时,工作的会很好。
第二种方法是通过一个程序接口来创建Dataset,这个程序接口允许你构建一个模式,并且将它应用到一个已经存在的RDD上。但是这个方法比较冗长,它允许你只有在运行时才知道列和列类型时来构造Dataset。

Inferring the Schema Using Reflection

Spark SQL支持自动将一个JavaBean的RDD转换为一个DataFrame。BeanInfo使用反射机制获得,定义了表的模式。当前,Spark SQL不支持那些包含了Map类型字段的JavaBean,但是对于嵌套的JavaBean以及嵌套了List或Array类型的字段给予了充分的支持。你可以通过创建一个实现了Serializable接口以及为所有字段生成getter和setter方法的类来创建一个JavaBean。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

完整的代码,请查看 examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 。

Programmatically Specifying the Schema

当JavaBean无法提前定义时(例如,记录的结构被编码为一个字符串,或者一个文本数据集将被解析,但是其中的字段可能根据不同的用户而不一样),Dataset能够通过三个步骤来创建。

1、根据原生的RDD创建一个RDD。
2、创建一个与第一步骤RDD中Row结构匹配的StructType来描述的模式。
3、通过由SparkSession提供的createDataFrame方法,将这个模式应用到RDD。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("examples/src/main/resources/people.txt", 1)
.toJavaRDD();
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+

完整的示例,请查看 examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 。

Aggregations

内置的DataFrame函数提供了常用的聚合操作,如count()、countDistinct()、avg()、max()、min()等。然而这些函数是为了DataFrame设计的,Spark SQL同样由类型安全的版本,以便其中一些被用到Scala和Java的强类型Dataset。此外,Spark没有限制用户预定义聚合函数,可以自己来创建聚合函数。

Untyped User-Defined Aggregate Functions

用户要实现无类型聚合函数,则需要继承UserDefinedAggregateFunction抽象类。例如,你一个用户自定义的平均数函数,看起来像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public static class MyAverage extends UserDefinedAggregateFunction {
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
// Data types of input arguments of this aggregate function
public StructType inputSchema() {
return inputSchema;
}
// Data types of values in the aggregation buffer
public StructType bufferSchema() {
return bufferSchema;
}
// The data type of the returned value
public DataType dataType() {
return DataTypes.DoubleType;
}
// Whether this function always returns the same output on the identical input
public boolean deterministic() {
return true;
}
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0L);
buffer.update(1, 0L);
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
// Calculates the final result
public Double evaluate(Row buffer) {
return ((double) buffer.getLong(0)) / buffer.getLong(1);
}
}
// Register the function to access it
spark.udf().register("myAverage", new MyAverage());
Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+

查看完整示例,请参考 examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedUntypedAggregation.java 。

Type-Safe User-Defined Aggregate Functions

强类型Dataset的用户自定义聚合围绕着Aggregator抽象类来解决。例如,一个类型安全的用户自定义平均数看起来是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
public static class Employee implements Serializable {
private String name;
private long salary;
// Constructors, getters, setters...
}
public static class Average implements Serializable {
private long sum;
private long count;
// Constructors, getters, setters...
}
public static class MyAverage extends Aggregator<Employee, Average, Double> {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
public Average zero() {
return new Average(0L, 0L);
}
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
public Average reduce(Average buffer, Employee employee) {
long newSum = buffer.getSum() + employee.getSalary();
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}
// Merge two intermediate values
public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}
// Transform the output of the reduction
public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}
// Specifies the Encoder for the intermediate value type
public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}
// Specifies the Encoder for the final output value type
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+

完整的示例,请看 examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java 。

Data Sources

Spark SQL通过DataFrame接口支持多种数据源的操作。DataFrame能够使用关系转换进行操作,也可以被用来创建一个临时视图。将DataFrame注册为一个临时视图,将允许你在视图的数据上运行SQL查询。这一章节描述了使用Spark Data Sources加载和保存数据的一般方法,然后介绍内置数据源可用的详细参数。

Generic Load/Save Functions

最简单的格式,默认数据源(默认是parquet, 除非通过spark.sql.soiurces.default配置修改过)将被用于所有操作。

1
2
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

查看完整示例,请参考 examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java 。

Manually Specifying Options

你还可以手动指定想要使用的数据源,以及传递给数据源任何额外的参数。数据源可以通过它的完整限定名(如:org.apache.spark.sql.parquet)来指定,但是对于内置的数据源,你也能够使用它的短名字(json、parquet、jdbc、orc、libsvm、csv、text)。从任何类型数据源加载的DataFrames,通过使用这个语句都可以转为其他类型。
要加载一个JSON文件,你可以使用:

1
2
Dataset<Row> peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

查看完整示例,请参考:xamples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。
要加载一个CSV文件,你可以使用:

1
2
3
4
5
Dataset<Row> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");

查看完整示例,请参考:xamples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Run SQL on files directly

除了使用read API加载文件到DataFrame然后查询它之外,你还可以使用SQL直接查询那个文件。

1
2
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

查看完整示例,请参考:xamples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Save Modes

保存操作可以选择一种SaveMode,它指定了如何处理存在的数据。一件非常重要的事情是这些保存模式没有利用任何锁,并且它们不是原子操作。另外,当执行Overwrite模式时,已有的数据将会在写出新数据之前被删掉。

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) “error” or “errorifexists” (default) 当保存一个DataFrame到一个数据源时,如果数据已经存在,预计将抛出一个异常
SaveMode.Append “append” 当保存一个DataFrame到一个数据源时,如果数据或表格已经存在,DataFrame的内容将被追加到已存在数据
SaveMode.Overwrite “overwrite” Overwrite模式意味着,当保存一个DataFrame到一个数据源时,如果数据或表格已经存在,已存在的数据将会被DataFrame的内容所覆盖
SaveMode.Ignore “ignore” Ignore模式意味着当保存一个DataFrame到一个数据源时,如果数据已经存在,保存操作将不会保存DataFrame的内容,并且不会修改已经存在的数据。这个操作类似 CREATE TABLE IF NOT EXISTS

Saving to Persistent Tables

使用saveAsTable命令,DataFrames也可以作为持久化表被保存到Hive metastore中。注意,使用这个功能不需要现有Hive的部署。Spark将会为你创建一个默认的本地Hive metastore(使用Derby)。与createOrReplaceTempView命令不同,saveAsTable将显示DataFrame的内容并创建一个指向Hive metastore中数据的指针。持久化表将在你的Spark程序重启之后持续存在,只要你维持你的连接在相同的metastore。通过在SparkSession上调用table方法(并传递表的名字),就能根据持久化表创建对应的DataFrame。
对于基于文件的数据源,如:text、parquet、json等。通过path选项,你可以指定一个自定义表路径,如:df.write.option(“path”, “/some/path”).saveAsTable(“t”)。当这个表被删除,自定义表路径将不会被移除,并且表数据依然存在。如果没有指定自定义表路径,Spark将会把数据写到仓库目录下的默认表路径。当这个表被删除时,默认表路径也会一并被删除。
从Spark2.1开始,持久化数据源表格在Hive metastore中有独立的元数据。这样做又一些优点:

因为metastore只返回查询所需的partition,因此表上的首次查询就不需要查找所有的aprtition。
Hive DDL(如ALTER TABLE PARTITION … SET LOCATION),对于使用Datasource APi来创建表都是可用的。

注意,当创建外部数据源表时(那些带有path选项的),分区信息默认是不会被收集的。要同步分区信息到metastore中,你可以执行MSCK REPAIR TABLE。

Bucketing, Sorting and Partitioning

对于基于文件的数据源,还可以对输出进行分组并排序或分组并分区。分组并排序只对持久化表适用:

1
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");

完整的代码,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。
当使用Dataset API时,partitioning能够和save以及saveAsTable一起使用。

1
2
3
4
5
usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");

完整的代码,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。
可以对单个表使用partitioning和bucketing:

1
2
3
4
5
peopleDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed");

完整的代码,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。
partitionBy创建了一个在Partition Discovery章节中描述的目录结构。因此,它对具有高基数的列的适用性有限。相比之下,BucketBy会跨固定数量的bucket来分布部署数据,and can be used when a number of unique values is unbounded.(!!!无法理解)

Parquet Files

Parquet时一种列式文件格式,它被很多其他数据处理系统所支持。Spark SQL对Parquet文件提供了读写支持,并能够自动保护原始数据的模式。当写Parquet文件时,为了兼容的原因,所有列被自动转换为nullable。

Loading Data Programmatically

使用上面例子中的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");
// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

完整示例,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Partition Discovery

在像Hive这样的系统中,常用的优化方法时进行表分区。在分区表中,数据通常存储在不同的目录中,根据分区列的值,编码到每个分区目录的路径中。所有内置文件源(包括Text/CSV/JSON/ORC/Parquet)都能够自动发现并推断分区信息。例如,我们能够将我们之前使用的数据存储到如下目录结构的分区表中,这个分区表使用两个额外的字段gender和country来作为分区字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...

通过将path/to/table传递给SparkSession.read.parquet或SparkSession.read.load,Spark SQL将自动从路径中获取分区信息。现在返回的DataFrame的模式变成:

1
2
3
4
5
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

注意,分区列的数据类型是自动推断的。当前支持数字数据类型、日期、时间戳和字符串类型。有些时候,用户可能不想自动推导分区列的数据类型。对于这种情况,自动类型推导能够通过配置项spark.sql.sources.partitionColumnTypeInference.enabled来配置,该配置默认值为True。当类型推导被禁用后,分区列将使用字符串类型。
从Spark1.6开始,分区发现默认只能查找给定路径下的。因此,对于上面的那个例子,如果用户传递path/to/table/gender=male给SparkSession.read.parquet或SparkSession.read.load,那么gender将不会被当成一个分区列。如果用户想要具体说明分区开始查找的基本目录,可以在数据源选项中设置basePath。例如,当数据目录为path/to/table/gender=male时,并且设置了basePath为path/to/table/,那么gender将会是一个分区列。

Schema Merging

和ProtocolBuffer、Avro以及Thrift一样,Parquet也支持模式演化。用户可以先从一个简单的schema开始,然后根据需要逐渐增加更多的列。通过这种方式,用户可能最终会得到不同但相互兼容的多个Parquet文件。Parquet数据源能够自动发现这种情况,并合并这些文件的schemas。
因为合并schema是一个成本相当高的操作,而且在很多情况是不必要的,因此从1.5.0开始,该功能默认是关闭的。你可以通过以下来启用它:

当你读区Parquet文件时,设置数据源选项 mergeSchema为true(下面的列子将展示)或者
设置全局SQL选项 spark.sql.parquet.mergeSchema为true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public static class Square implements Serializable {
private int value;
private int square;
// Getters and setters...
}
public static class Cube implements Serializable {
private int value;
private int cube;
// Getters and setters...
}
List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
Square square = new Square();
square.setValue(value);
square.setSquare(value * value);
squares.add(square);
}
// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");
List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
Cube cube = new Cube();
cube.setValue(value);
cube.setCube(value * value * value);
cubes.add(cube);
}
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");
// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)

完整示例,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Hive metastore Parquet table conversion

当我们向Hive metastore Parquet table写数据或从中读数据时,Spark SQL奖尝试使用自己的Parquet支持来代替Hive SerDe以获取更好的性能。这个行为通过spark.sql.hive.converMetastoreParquet来配置,并且默认为打开的。

Hive/Parquet Schema Reconciliation

从表schema处理的角度来看,Hive和Parquet有两个主要区别:

1、Hive是不区分大小写的,而Parquet是区分大小写的。
2、Hive认为所有列nullable,而nullable在Parquet中很重要。

因为上面的原因,当我们将一个Hive metastore Parquet table转换为一个Spark SQL Parquet table时,我们必须将Hive metastore schema与Parquet schema调整一致。调整的规则为:

1、两个schema中相同名称的字段不管是否为空必须具有相同的数据类型。调整好的字段应当具有Parquet端的数据类型,因此nullable是具有意义的。
2、调整后的schema必须包含Hive metastore schema中定义的字段。
1)只出现在Parquet schema中的字段将从调整后的schema中删掉。
2)只出现在Hive metastore schema中的字段将被作为nullable字段添加到调整后的schema中。

Metadata Refreshing

Spark SQL为了更好的性能而缓存了Parquet metadata。当Hive metastore Parquet表转换启用时,那些被转换的表的metadata也会被缓存。如果这些表被Hive或其他外部工具更新了,你需要手动刷新它们以保证metadata的一致。

1
2
// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");

Configuration

Parquet的配置可以通过两种方式完成,在SparkSession上使用setConf方法或使用SQL运行SET key=value。

Property Name Default Meaning
spark.sql.parquet.binaryAsString false 一些其他产生Parquet的系统,主要是Impala、Hive以及老版本的Spark SQL,这些系统在写Parquet schema时不区分二进制数据和字符串。这个标记告诉Spark SQL为这些系统将二进制数据按照字符串来进行兼容。
spark.sql.parquet.int96AsTimestamp true 一些其他产生Parquet的系统,特别是Impala和Hive,它们使用INT96来存储时间戳。这个标记告诉Spark SQL将INT96按照时间戳来解析,以便为那些系统提供兼容。
spark.sql.parquet.compressio.codec snappy 设置写Parquet文件的压缩编码器。如果没有在表详情的选项/属性中指定”compression”或”parquet.compression”。根据优先级排序:compression > parquet.compression > spark.sql.parquet.compression.codec。该选项可以使用的值有:none、uncompressed、snappy、gzip或lzo。
spark.sql.parquet.filterPushdown. true 当设置为True时,启用Parquet过滤器的push-down优化。
spark.sql.hive.converMetastoreParquet true 当设置为false时,Spark SQL将对parquet table使用Hive SerDe,而不是使用内置支持。
spark.sql.parquet.mergeSchema false 当设置为true时,Parquet数据源合并从所有数据文件收集的schema,如果是false,将从摘要文件中挑选schema,如果没有摘要文件可用,则随机选择一个文件。
spark.sql.optimizer.metadataOnly. true 当设置为true时,启用metadata-only查询优化,这个优化使用表的metadata来产生分区列,而不是通过对表扫描。当所有扫描过的列示分区列,且查询操作有一个满足distinct语意的聚合操作时,适用。

ORC Files

从Spark 2.3开始,Spark支持向量ORC reader,这个reader使用新的ORC文件格式来读取ORC文件。因此新增了如下配置。当spark.sql.orc.impl被设置为native且spark.sql.orc.enableVectorizedReader被设置为true时,向量读取器将用于读区原生的ORC表(这些表使用USING ORC语句创建)。对于Hive ORC serde表(使用USING HIVE OPTIONS),当spark.sql.hive.convertMetastoreOrc也被设置为true时,向量reader被使用。

Property Name Default Meaning
spark.sql.orc.impl hive ORC实现类的名字。可以是native和hive中的一个。native意味着对构建于Apache ORC 1.4.1上的原生ORC支持。hive意味着对Hive 1.2.1中的ORC库进行支持。
spark.slql.orc.enableVectorizedReader true 在native实现中启用向量化orc编码。如果为false,一个新的非向量化ORC reader被用于native实现。对于hive实现,本项可以忽略。

JSON Datasets

Spark SQL能够自动推导一个JSON dataset的schema并将它加载为一个Dataset。这个转换能够在一个Dataset上或一个JSON文件上使用SparkSession.read().json()来完成。
注意,提供的json文件不是一个典型的JSON文件。每一行必须是一个独立有效的JSON对象(其实这句话的意思就是,一个json数据必须独立一行,不能跨多行)。关于更多的信息,请查看JSON Lines text format, also called newline-delimited JSON.
要想解析多行JSON文件,需要设置multiLine选项为true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset<String> storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

完整的示例,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Hive Tables

Spark SQL还支持对Apache Hive读写数据。然而,因为Hive有很多的依赖,而这些依赖默认没有包含在Spark的发布中。如果Hive的依赖能够在classpath中找到,Spark将自动加载它们。注意,这些Hive依赖也必须在所有worker节点上存在,因为它们需要访问Hive的序列化和反序列化库(SerDes)以便访问Hive上存储的数据。
Hive的配置是通过替换conf/目录下的hive-site.xml、core-site.xml(安全配置)和hdfs-sit.xml(HDFS配置)来完成的。
当使用Hive工作时,必须实例化支持Hive的SparkSession,包括连接到已有的Hive metastore、支持Hive serdes以及Hive自定义函数。即使没有Hive环境也能够启用Hive支持。当没有通过hive-site.xml进行配置时,context自动在当前目录创建metastore_db,并创建一个由spark.sql.warehouse.dir配置指定的目录,默认目录在Spark application启动的当前目录中的spark-warehouse。注意hive-site.xml中的hive.metastore.warehouse.dir属性在Spark2.0.0中废弃了,取而代之是使用spark.sql.warehouse.dir来指定数据库在仓库中的位置。你可以需要为启动Spark appliction的用户开放写权限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...

完整示例,请查看:examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java。

Specifying storage format for Hive table

当你创建一个Hive表时,你需要指定这个表应该如何从文件系统读写数据,例如”input format”和”output format”。你还需要定义这个表应该如何将data反序列化为row,或者如何将row序列化为data,如”serde”。下面的选项可以被用来指定存储格式(”serde”、”input format”、”output format”),如:CREATE TABLE src(id int) USING hive OPTIONS(fileFormat ‘parquet’)。默认情况下,我们将以简单文本的格式读取table。值得注意的是,在创建table的时候,存储handler还不被支持,你可以在Hive端使用存储handler来创建一个table,然后使用Spark SQL来读区它。

Property Name Meaning
fileFormat 用来说明文件格式的存储格式包,包括”serde”、”input format”和”output format”。当前我们支持6中文件格式:sequencefile、rcfile、orc、parquet、textfile和avro。
inputFormat\outputFormat 这两个选项用来指定”InputFormat“和”OutputFormat“类的名字,例如:org.apache.hadoop.hive.qllio.orc.OrcInputFormat。这两个选项应该成对出现,如果你设置了”fileFormat”选项,那么你不能分别指定它们。
serde 这个选项指定了一个serde类。当设置了‘fileFormat’选项时,如果给定的‘fileFormat’已经包含了serde信息,那么不要设置这个选项。目前,“sequencefile”、“textfile”和“rcfile”不包含serde信息,因此你可以为这3种文件格式设置此选项。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这个选项只能被用于”textfile”的文件格式。它们定义了文件的换行符。

其他属性使用OPTIONS进行定义,将作为Hive serde属性来考虑。

Interacting with Different Versions of Hive Metastore

Spark SQL的Hive支持的最重要部分是与Hive metastore的交互,它使Spark SQL能够访问Hive表中的metadata。从Spark 1.4.0开始,使用下面描述的配置,Spark有一个独立的包用来访问不同版本的Hive metadata。注意,无论要去访问的metastore的Hive是什么版本,在Spark SQL内部将针对Hive 1.2.1进行编译,并使用这些类作为内部执行(serdes、UDFs、UDAFs等)。
下面的选项能够被用来配置获取metadata的Hive的版本:

Property Name Default Meaning
spark.sql.hive.metastore.version 1.2.1 Hive metadata的版本。可用的选项从0.12.0到1.2.1
spark.sql.hive.metastore.jars builtin 被用于实例化HiveMetastoreClient的jar的位置。这个属性有三个选项:
1)builtin: 使用Hive 1.2.1,当-Phive被启用时,它与Spark assembly绑定。当这个选择了这个选项,spark.sql.hive.metastore.version必须是1.2.1或为定义。
2) maven:从Maven库中下载指定版本的Hive jars。这个配置对于生产环境通常不推荐。
3)JVM的标准classpath格式。这个classpath必须包含了Hive和它的依赖,以及对应的版本的Hadoop。这些jar只需要存在于driver上,但是如果你实在yarn资源管理器的集群上,那么你必须确保它们和你的application一起被打包。
spark.sql.hive.metastore. sharedPrefixes com.mysql.jdbc,org.postgresql, com.microsoft.sqlserver,oracla.jdbc 那些需要使用类加载器加载的用于在Spark SQL和指定版本的Hive之间共享的类前缀,类前缀是一个逗号分隔的列表。一个需要被共享的类就是JDBC driver,它需要访问metastore。其他需要共享的类是那些需要与已经共享类交互的类。例如,由log4j使用的自定义appender。
spark.sql.hive.metastore. barrierPrefixes (empty) Spark SQL所连接的每个版本的Hive都应明确加载的类的前缀,列表以逗号分隔。例如,通常需要被共享的Hive UDFs在一个前缀中被声明(如,org.apache.spark.*)

JDBC To Other Databases

Spark SQL还有一个数据源,可以使用JDBC从其他数据库读取数据。这个功能比使用jdbcRDD更加受欢迎。这是因为结果是作为一个DataFrame被返回,这样很容易的使用Spark SQL进行处理或与其他数据源相连接。JDBC数据源在Java或Python中使用起来也很容易,因为它不需要用户提供一个ClassTag。(注意,这不同于Spark SQL JDBC Server,Spark SQL JDBC Server允许其他application使用Spark SQL运行查询)
你需要在spark classpath中添加对应数据库的JDBC driver。例如,要从Spark shell连接到postgres,你应该运行如下命令:

1
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

使用Data Source API,远程数据库中的表可以被加载为一个DataFrame或Spark SQL临时视图。用户可以在数据源选项中指定JDBC的连接属性。连接通畅需要提供user和password属性,来登陆数据源。除了连接属性外,Spark还支持如下的选项,这些选项忽略大小写:

Property Name Meaning
url 进行连接的JDBC URL。特定数据源的连接属性可能会在URL中设置。如:jdbc:postgresql://localhost/test?user=fred&password=secret。
dbtable 要读取的JDBC表。注意,在SQL查询中的From子句中有效的任何东西,都能使用。例如,你可以在括号中使用子查询来代替全表。
driver 连接URL的JDBC driver的类名。
partitionColumn, lowerBound, upperBound 这些选项中的一个被指定,那么所有的都必须被指定。此外,numPartitions必须被指定。它们描述了多个worker并行读取表数据时,应该如何分区。partitionColumn必须是表中的数值列。注意,lowerBound和upperBound仅仅用来决定分区的幅度,而不是过滤表中的行。因此表中的所有行都将被分区并返回。这个选项只能被用于读取。
numPartitions 并行读写表的最大分区数。这也确定了JDBC连接的最大并发。如果写的分区数量超过了这个限制,我们可以在写数据之前调用coalesce(numPartitions)来减少它。
fetchsize JDBC的提取大小,它确定了每次通信能够取得多少行。它能够帮助提升那些默认fetch size低的JDBC dirver的性能(比如,Orache的fetch size为10)。这个选项只能用于读操作。
batch JDBC的batch大小,它确定了每次通信能够插入多少行。这能够帮助提升JDBC dirver的性能。这个选项只能用于写操作。默认值为1000。
isolationLevel 事务的隔离级别,应用于当前连接。它可以是:NONE\READ_COMMITTED\ READ_UNCOMMITTED\REPEATABLE_READ\SERIALIZABLE中的一个,通过JDBC连接对象来定义标准事务的隔离级别,默认为READ_UNCOMMITTED。这个选项只能用于写操作。请参考java.sql.Connection文档。
sessionInitStatement session初始化声明。在到远程数据库的session被打开之后,开始读取数据之前,这个选项执行一个自定义语句(PL/SQL块)。使用这个来实现session的初始化代码。例如:option(“色上司哦那I逆天S塔特闷它”, “”“BEGIN execute immediate ‘alter session set “_serial_direct_read”=true’; END; “””)
truncate 这是一个与JDBC writer相关操作。当启用了SaveMode.Overwrite,这个选项控制删除已存在的表,而不是先drop表然后再创建表。这个更加有效率,并且避免了表的metadata被删除。然而在某些情况下,它无法工作,如新数据有不同的schema。该选项默认值为false。这个选项只用于写操作。
createTableOptions 这是一个与JDBC writer相关的操作。如果设置,该选项允许在创建表的时候设置特定数据库表和分区的选项(如,CREATE TABLE T(name string) ENGINE=InnoDB)。这个选项只能被用于写操作。
createTableColumnTypes 当创建表时,用来代替默认的数据库列类型。数据类型信息使用与CREATE TABLE columns语句(如:”name CHAR(64), comments VARCHAR(1024)”)相同的格式被指定。被指定的数据类型应该是有效的spark sql数据类型。本选项只能用于写操作。
customSchema 自定义schema用于从JDBC连接中读取数据。例如,”id DECIMAL(38, 0), name STRING”。你也可以指定部分字段,其他的时候默认类型映射。例如:”id DECIMAL(38, 0)”。列名称应该与JDBC表的相关列名称一致。用户可以指定Spark SQL的相关数据类型,而不是使用默认的。这个选项只能被用于读操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Saving data to a JDBC source
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Specifying create table column data types on write
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

完整示例代码,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Troubleshooting

1、JDBC dirver类对于client session和所有executor的主类加载器是可访问的。这是因为Java的DriverManager会做一个安全检查,当DriverManager要打开一个连接时,检查结果会忽略所有主类加载器无法访问的driver。一个简便的方法是修改所有worker节点的compute_classpath.sh来包含你的driver JAR。
2、一些数据库,如H2,需要将所有名字转换为大写。你需要在Spark SQL中使用大写来引用那些名字。

Performance Tuning

通过将数据缓存到内存或开启一些创新选项,一些工作量是可以优化提升性能的。

Caching Data In Memory

通过调用spark.catalog.cacheTable(“tableName”)或dataFrame.cache(),Spark能够使用内存中列式格式来缓存表。Spark SQL将扫描需要的列,并自动调整压缩,以达到最小的内存使用和GC压力。你可以使用spark.catalog.uncacheTable(“tableName”),将table从内存中移除。
配置内存缓存可以通过两种方式来实现:在SparkSession上调用setConf方法,或者使用SQL来执行SET key=value命令。

Property Name Default Meaning
spark.sql. inMemoryColumnarStorage.compressed true 当设置为true的时候,Spark SQL将基于数据的统计自动为每一列选择一种压缩编码器。
spark.sql. imMemoryColumnarStorage.batchSize 10000 控制列式缓存的批量大小。较大的批量size会影响内存会提高内存的利用率和压缩,但是会产生内存溢出的风险。

Other Configuration Options

下面的选项也能够被用来提高查询的效率。随着Spark的优化,这些选项在未来可能会被废弃。

Property Name Default Meaning
spark.sql.files.maxPartitionbytes 134217728 (128 MB) 读取文件时,单个分区的最大字节数。
saprk.sql.files.openCostInBytes 4194304 (4 MB) 打开一个文件的成本,通过在同一时间能够扫描的字节数来测量。当推送多个文件到一个partition时非常有用。提高这个值会更好,这样写小文件的partition要比写大文件的partition更加快(写小文件的partitin优先调度)。
spark.sql.broadcastTimeout 300 broadcast连接的等待时间,以秒为单位。
spark.sql.broadcastJoinThreshold 10485760 (10 MB) 当执行join操作时,为那些需要广播到所有worker节点的表设置最大字节数。通过设置这个值为-1,广播操作可以被禁用。注意,当前的统计只支持那些运行了ANALYZE TABLE COMPUTE STATISTICS命令的Hive Metastore表。
spark.sql.shuffle.partitions 200 当为join或aggregation操作而混洗数据时,用来配置使用partitions的数量。

Broadcast Hint for SQL Queries

BROADCAST hint指导Spark在使用其他表或视图join指定表时,如何广播指定表。在Spark决定join方法时,broadcast hash join被优先考虑,即使统计高于spark.sql.autoBroadcastJoinThreshold的配置。当join两边都被指定了,Spark广播具有较低统计的那边。注意Spark不保证BHJ(broadcast hash join)总是被选择,因为不是所有的情况都支持BHJ。当broadcast nested loop join被选择时,我们仍然最重提示。

1
2
import static org.apache.spark.sql.functions.broadcast;
broadcast(spark.table("src")).join(spark.table("records"), "key").show();

Distributed SQL Engine

使用Spark SQL的JDBC/ODBC或command-line interface,Spark SQL也能够具有分布式查询引擎的行为。在这种模式中,终端用户或application能够直接与Spark SQL交互来运行SQL查询,而不需要写任何的代码。

Running the Thrift JDBC/ODBC server

Thrift JDBC/ODBC server实现了相当于Hive 1.2.1中的HiveServer2。你可以使用Spark或Hive1.2.1的beeline脚本来测试JDBC server。
要启动JDBC/ODBC server,在Spark目录中运行如下:

1
./sbin/start-thriftserver.sh

这个脚本接受所有bin/spark-submit命令的行的参数,并增加了一个–hiveconf选项用来指定Hive属性。你可以执行 ./sbin/start-thriftserver.sh –help来获取完整的可用属性列表。默认,这个server监听的是本地的10000端口。要想重写这个丢昂扣,你可以修改环境变量:

1
2
3
4
5
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \
...

或者修改系统属性:

1
2
3
4
5
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
...

现在,你可以使用beeline来测试Thrift JDBC/ODBC server:

1
./bin/beeline

在beeline中连接JDBC/ODBC server可以使用:

1
beeline> !connect jdbc:hive2://localhost:10000

beeline将会询问你用户名和密码。在非安全模式中,输入你机器的用户名和空白的密码。对于安全模式,请遵循beeline documentation的指导。

通过替换conf/中hive-site.xml、core-site.mxl和hdfs-site.xml来完成Hive的配置。

你可能还需要使用Hive提供的beeline脚本。

Thrift JDBC server还支持通过HTTP协议发送thrift RPC messages。要启用HTTP模式,可以如下修改系统属性,或者修改conf中的hive-site.xml:

1
2
3
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要进行测试,使用beeline以http模式连接到JDBC/ODBC server:

1
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

Running the Spark SQL CLI

Spark SQL CLI是一个方便的工具用来在本地模式中运行Hive metastore服务并执行来自命令的查询输入。注意,Spark SQL CLI不能与Thrift JDBC server通信,
要启动Spark SQL CLI,在Spark目录中运行如下脚本:

1
./bin/spark-sql

通过替换conf/中hive-site.xml、core-site.mxl和hdfs-site.xml来完成Hive的配置。

Reference

Data Types

Spark SQL和DataFrame支持如下数据类型:

1、Numeric types
ByteType:声明一个一个字节的有符号的整型。数值范围从-128到127。
ShortType:声明一个两字节的有符号的整型。数值范围从-32768到32767。
IntegerType:声明一个四字节的有符号的整型。数值范围从-2147483648到2147483647。
LongType:声明一个八个字节的有符号的整型。数值范围从-9223372036854775808到9223372036854775807。
FloatType:声明一个四字节的单精度浮点数值。
DoubleType:声明一个八字节的双精度浮点数。
DecimlType:声明一个任意精度的有符号的十进制数值。内部由java.math.BigDecimal支持。一个DecimlType由一个任意精度的不能整型值和一个32位的整型组成。
2、Strubg type
声明一个字符串值。
3、Binary type
BinaryType:声明一个字节序列值。
4、Boolean type
BooleanType:声明一个boolean值。
5、Datetime type
TimestampType:声明一个由year、month、day、hour、minute和second字段的值组成。
DateType:声明一个由year、month和day字段的值组成。
6、Complex types
ArrayType(elementType, containsNull):声明一个elementType类型序列。containsNull用来检测ArrayType中是否包含null的值。
MapType(keyType, valueType, valueContainsNull):由一组key-value对组成。key的数据类型由KeyType来描述,value的数据类型由valueType来描述。对于MapType的一个值,keys不允许为null。valueContainsNull
被用来检测MapTypte的values中是否包含null值。
StructType(fields):StructFields(fields)序列。
StructField(name, datatype, nullable): StructType类型的字段。字段的名称通过name指定。字段的数据类型通过datatype来指定。nullable用来决定这个fields的values是否可以有null。

Spark SQL的所有数据类型都位于org.apache.spark.sql.types包中。要访问或创建一种数据类型,请使用org.apache.spark.sql.types.DataTypes中提供的接口方法。

Data type Value type in Java API to access or create a data type
ByteType byte or Byte DataTypes.ByteType
ShortType short or Short DataTypes.ShortType
IntegerType int or Integer DataTypes.IntegerType
LongType long or Long DataTypes.LongType
FloatType float or Float DataTypes.FloatType
DoubleType double or Double DataTypes.DoubleType
DecimalType java.math.BigDecimal DataTypes.createDecimalType() DataTypes.createDecimalType(precision, scale)
StringType String DataTypes.StringType
BinaryType byte[] DataTypes.BinaryType
BooleanType boolean or Boolean DataTypes.BooleanType
TimestampType java.sql.Timestamp DataTypes.TimestampType
DateType java.sql.Date DateTypes.DateType
ArrayType java.util.List DataTypes.createArrayType(elementType) 注意:containsNull的值为true。
MapType java.util.Map DataTypes.createMapType(keyType, valueType) 注意,valueContainsNull的值将为true
StructType org.apache.spark.sql.Row DataTypes.createStructType(fields)
StructField The value type in Java of the data type of this field DataTypes.createStructField(name, dataType, nullable)

spark_2.3.1_QuickStart

发表于 2018-08-10   |   分类于 spark 2.3.1

Quick Start

本指南快速的介绍如何使用Spark。我们将通过Spark的交互式shell(用Python或Scala)首先引入API,然后展示如何用Java、Scala和Python写application。
要遵循这个指南,首先需要从Spark的网站上下载Spark包。因为我们不使用HDFS,因此你可以现在任何版本的Hadoop。
注意,在Spark 2.0之前,Spark的主要程序接口是Resillent Distributed Dataset(RDD)。在Spark 2.0之后,RDD被Dataset所代替,Dataset类似于RDD的强类型,但是底层有更佳丰富的优化。RDD接口仍然被支持,你可以在RDD programming guide。然而,我们高度推荐你使用Dataset,它比RDD有更好的性能。查看SQL programming guide 以获取更多关于Dataset的详细信息。

Interactive Analysis with the Spak Shell

Basics

Spark的shell提供了简单的方式来学习API,以及一种强大的工具来交互式的分析数据。可以通过Scala(它运行在Java虚拟机上,因此它是学习已有Java库的很好方式)或Python来使用。通过在Spark目录下运行如下脚本来启动:

1
./bin/spark-shell

Spark的主要抽象是一个名为Dataset的分布式项目(数据条目–一条条的数据)集合。Dataset可以通过Hadoop InputFormates(如HDFS文件)来创建,或者由其他Dataset来转换。我们根据Spark源目录下README文件中的文本来创建一个新的Dataset:

1
2
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

通过调用一些action,你可以直接冲Dataset获取值,或者将这个Dataset转换为另一个新的Dataset。对于更多的细节,请查看API doc。

1
2
3
4
5
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

现在,我们将这个Dataset转换为一个新的。我们调用filter,将会返回一个包含文件子集合的新的Dataset。

1
2
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

我们可以将转换和action串联在一起:

1
2
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

More on Dataset Operations

Dataset的转换和action可以被用于更加复杂的计算。假设我们要找出含有打你最多的一行:

1
2
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

它首先将一个行映射为一个数值,这创建了一个新的Dataset。reduce在Dataset上被调用,用来找到最大的数。map和reduce的参数是Scala的函数(闭包),也可以使用任何语言的特性或Scala/Java库。例如,我们在任意地方调用函数的声明(引入)。我们将使用Math.max()函数来使代码更加容易理解:

1
2
3
4
5
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一个常用的数据流是MapReduce。Spark能够很轻松的实现MapReduce流:

1
2
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

这里,我们调用flatMap将行的Dataset转换为一个单词的Dataset,接着利用groupbyKey和count的组合来计算每个单词在文件中出现的次数(String, Long对)从而生成一个新的Dataset。要在shell中收集单词的数量,我们可以调用collect:

1
2
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Pyhon,2), (agree,1), (cluster.,1), ...)

Caching

Spark还支持将数据集合缓存到集群端内存缓存中。这在数据被反复访问时非常有用,例如当查询一个非常热门的数据集时,又或是在运行一个类似PageRank这样的迭代算法时。作为一个简单的例子,我们将linesWithSpark数据进行缓存:

1
2
3
4
5
6
7
8
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15

使用Spark来分析并缓存一个100行的文本开起来很愚蠢。有意思的是,这些相同的函数可以被用在非常大的数据集上,即使它们跨越数十个甚至数百个节点。你可以通过连接bin/spark-shell到一个集群来进行交互式操作,就像RDD programming guide中描述的。

Self-Contained Applications

假设我们想要使用Spark API写一个自包含的application。我们将使用Scala(利用sbt)、Java(利用Maven)和Pyton(利用pip)来实现一个简单的application。
这里我们将使用Maven来构建一个application JAR,其他类似的构建系统也可以。
我们将创建一个非常简单的Spark application,SimpleApp.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}

这段代码用来计算Spark README文件中包含’a’的行数,和包含’b’的行数。注意你需要将YOUR_SPARK_HOME替换为Spark的安装位置。和之前使用Spark shell不同,Spark shell会初始化它自己的SparkSession,而在代码中初始化SparkSession是程序的一部分。
要构建这个程序,我们还需要写一个Maven的pom.xml文件,在这个文件中列出Spark的依赖。注意Spark的依赖和Scala的版本要对应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
</project>

我们根据规范列出了Maven的目录结构:

1
2
3
4
5
6
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在我们可以使用Maven进行打包,并使用./bin/spark-submit来执行它。

1
2
3
4
5
6
7
8
9
10
11
12
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

Where to Go from Here

恭喜你运行了自己的第一个Spark application!

对于API的更深了解,可以从RDD programming guide和SQL programming guide或者查看 ‘Programming Guides’菜单来了解其他组件。
想要在集群上运行application,去deployment overview。
最后,Spark在examples目录中包含了一些例子(Scala, Java, Python, R)。你可以如下运行它们:

1
2
3
4
5
6
7
8
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R

spark_2.3.1_MonitoringAndInstrumentation

发表于 2018-08-10

Spark 2.3.1 Cluster Mode Overview

发表于 2018-08-10   |   分类于 spark 2.3.1

Cluster Mode Overview

本文档对Spark如何在集群上运行给出了一个简短的浏览,以便更加容易理解相关组件。通过看application submission guide来学习关于在集群上启动一个applicaiton的信息。

Components

Spark的application作为一组独立的进程在集群上运行,通过你主程序(被称为driver)中的SparkContext对象来协调合作。
具体来说,要在集群上运行application,SparkContext能够连接到某种类型的集群管理器(Spark自己的standalone集群管理器、Mesos或YARN),集群管理器能够跨application分配资源。一旦连接成功,Spark得到集群节点上的executor,这些executor为你的application执行计算以及存储数据。接下来SparkContext发送你的application代码(由传递给你SparkContext的JAR或Python文件定义)到executor。最终,SparkContext发送任务到executor来运行。

此处是图片
关于这个结构,有一些有用的东西需要注意:

1、每个application会得到自己的executor进程,这些executor在这个application持续期间保持不变并以多线程运行任务。这样的好处是application彼此隔离,无论是在调度方面(每个driver调度它自己的任务)还是executor方面(来自不同application的任务运行在不同的JVM中)。因此,这也意味着数据在不同的Spark application之间是不能共享的,除非借助其他外部存储。
2、Spark与底层集群管理器无关。只要它能够得到executor进程,并且它们能够彼此通信,这样即使是在支持其他application的集群管理器上也能相对简单的运行。
3、在driver程序整个生命周期内,它必须监听并接受来自它的executor的连接(查看网路配置章节spark.driver.port)。因此,driver程序对于它的worker节点来说必须是可以迅指的。
4、因为driver在集群上调度任务,因此它应该靠近worker节点,最好是在相同的局域网内。如果你想要远程向集群发送请求,最好为你的driver打开一个RPC,让它就近提交,而不是在远离worker节点的地方运行driver。

Cluster Manager Types

系统当前支持3种集群管理器:

Stangalone - Spark自带的一种集群管理器,使用它能够很容易的构建集群。
Apache Mesos - 一个很普遍的集群管理器,它还能够运行Hadoop的MapReduce和服务应用。
Hadoop YARN - Hadoop2种的资源管理器。
Kubernetes - 一个开源的系统,用于自动部署、扩展以及管理application。

Submitting Applications

使用spark-submit脚本可以将application提交到任何类型集群。application submission guide描述了应该如何做。

Monitoring

每个driver程序都有一个Web UI,端口一般是4040,这个web UI展示了运行的任务、executor已经存储的使用情况。通过在浏览器中输入http://:4040就可以访问这个UI。monitoring guide描述了其他的监控项。

Job Scheduling

spark给出了两种资源分配,一种是跨applications(在集群管理器级别上),一种是application中(在相同SparkContext上出现多次计算的情况)。job scheduling overview描述了详细信息。

Glossary

下面的表格列出了常用的一些集群概念:

Term Meaning
Application 在Spark上构建的用户程序。由一个driver程序和集群上的executors组成。
Application jar 一个包含了用户的Spark application的jar。在某些情况下用户可能想要创建一个”uber jar”来包含他们的application以及依赖。用户的jar应当不要包含Hadoop或Spark的库,因为这些库会在运行时自动被添加。
Driver program 运行application的main函数并创建SparkContext的进程。
Cluster 一个额外的服务,用来获取集群上的资源(如standalong manager、Mesos或YARN)。
Deploy mode 用来区分在哪里运行driver进程。在“cluster”模式中,系统在集群内部启动driver。在“client”模式中,在集群之外启动driver。
Worker node 集群中任何可以运行application的节点。
Executor 在worker节点上启动的用来处理application的进程,它执行任务并在内存或磁盘上保存数据。每个application都有自己的exectors。
Task 发送给executor的一个工作单元。
Job 由多个tasks组成的一个并行计算,并为一个spark action产生结果(如 save、collect)。 你将会在driver的日志中看到它们。
Stage 每个job被划分为一更小的task,称为stage,这些stage相互依赖(类似MapReduce中map阶段和reduce阶段)。你将会在driver的日志中看到他们。

Spark 2.3.1 Submit Applications

发表于 2018-08-09   |   分类于 spark 2.3.1

Submitting Applications

Spark的bin目录下的spark-submit脚本用于在一个集群上启动一个应用。通过一个统一的接口,它可以使用所有Spark支持的集群管理器,因此你不需要针对每种集群管理器来单独配置你的应用。

Bunding Your Application’s Dependencies

如果你的代码依赖其他项目,那么你需要将它们和你的应用一并打包,以便分发代码到一个spark集群。要完成这些,需要创建一个assembly jar(uber jar)来包含你的代码和代码的依赖。sbt和Maven都有assembly插件。当创建assembly jar时,排除Spark和Hadoop提供的依赖,因为这些不需要绑定,因为这些将由集群管理器在运行时提供。一旦你弄好了assembly jar,你就可以如下所示在调用 bin/spark-submit脚本是传递你的jar。
对于Python,你可以使用spark-submit的–py-files参数来添加.py、.zip或.egg文件,让他们和你的应用一起分发。如果你依赖多个python文件,我们推荐将他们打到一个.zip或.egg包中。

Launching Applications with spark-submit

一旦一个用户应用被绑定,就可以使用bin/spark-submit脚本来启动这个应用。这个脚本负责设置Spark的classpath和它依赖,而且脚本支持由Spark支持的不同的集群管理器和部署模式。

1
2
3
4
5
6
7
8
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

一些常用的选项:

–class:你应用的执行程序(如:org.apache.spark.example.SparkPi)
–master:集群的master URL(如:spark://23.195.26.187:7077)
–deploy-mode:在worker节点(cluster)上部署你的driver还是在本地作为一个额外的客户端(client)来部署。默认是client。
–conf:以key=velue格式配置的任意的Spark配置属性。对于包含空格的值,使用双引号包含起来,如”key=value”。
application-jar:指向你的应用程序和它依赖的jar的路径。这个URL必须是你集群内部全局可见,例如,一个hdfs://路径或一个在所有节点上都存在的file://路径。
application-arguments:任何需要传递给你的主类的主方法的参数。

常见的部署策略是,在一个与你的worker机位置相同的gateway机器上提交你的应用。在这种设置中,client模式是合适的,driver在spark-submit进程中被直接启动,这种方式像是集群的一个client。这个应用的输入和输出被打印到控制台。因此这种模式特别适合那些涉及REPL的应用。

此外,如果你的应用是用一个远离worker机器的机器上提交的,通常使用cluster模式来降低drivers和executors之间的网络传输。目前,standalone模式还不能够为Python应用提供cluster模式。

对于Python应用,在处传递一个.py来代替一个jar,在–py-files中添加.zip、.egg或.py,作为搜索目录。

这里有一些选项可用,用来指定使用的集群管理器。例如,对于cluster部署模式的standalone管理器管理的Saprk集群,你可以指定 –supervise 来保证在非0退出代码时,driver被自动重启。要枚举spark-submit所有可用的选项,使用–help运行spark-submit。这里有些常用的选项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
/path/to/examples.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000
# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master mesos://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
http://path/to/examples.jar \
1000
# Run on a Kubernetes cluster in cluster deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master k8s://xx.yy.zz.ww:443 \
--deploy-mode cluster \
--executor-memory 20G \
--num-executors 50 \
http://path/to/examples.jar \
1000

Master URLs

传递给Spark的master URL可以是下面格式中的一个:

Master URL Meaning
local 使用单个线程以本地模式运行Spark
local[K] 使用K个线程以本地模式运行Spark
local[K, F] 使用K个线程以本地模式运行Spark,允许最多F个失败
local[*] 使用与你机器逻辑核数相同的线程,以本地模式运行Spark
local[*, F] 使用于你机器逻辑核数相同的香橙,以本地模式运行Spark,允许最多F个失败
spark://HOST:PORT 连接到给定的以standalone模式运行的集群的Master。端口必须是你的master所配置使用的,默认为7077
spark://HOST1:PORT1,HOST2:PORT2 连接到使用了Zookeeper以standalone模式运行的带有standby master的集群。这个列表必须包含了使用Zookeeper配置的高可用集群的所有master的host。端口必须是你的master所配置使用的,默认为7077。
mesos://HOST:PORT 连接到给定的以MESOS模式运行的集群。端口必须是你的配置中使用的,默认为5050。或者,对于使用了Zookeeper的Mesos集群,使用mesos://zk://…配合–deploy-mode cluster来提交,HOST:PORT应该被配置为连接到MesosClusterDispatcher。
yarn 以cluster或client模式连接到yarn集群,连接模式通过 –deploy-mode来指定。这个集群的位置将基于HADOOP_CONF_DIR或YARN_CONF_DIR变量来找到。
k8s://HOST:PORT 以cluster模式连接到Kubernetes集群。Client模式当前还不支持,将会在未来被支持。HOST和PORT指向[Kubernetes API Server]。默认使用TLS连接。想要强制使用不安全的连接,你可以使用k8s://http://HOST:PORT。

Loading Configuration from a File

spark-submit脚本能够从一个属性文件中加载默认的Spark配置属性值,并传递它们到你的应用。默认它将从
Spark目录的conf/spark-defaults.conf中读取选项。
加载默认Spark配置,这种方式可以避免给spark-submit设置有确切值的选项(有些选项的值是固定的)。例如,如果设置了spark.master属性,你就可以在spark-submit中忽略–master项了。通常,在SparkConf中设置的值具有最高优先级,其次是传递给spark-submit的值,最后是默认文件里的值。

如果你无法确认配置项的值来自哪里,你可以在运行spark-submit是使用-verbose选项,将细粒度的调试信息打印出来。

Advanced Dependency Management

在使用spark-submit的时候,应用程序jar以及使用–jars选项包含的人和jar将会自动传输到集群。–jars后面提供的URLs必须以逗号分隔。那个列表被包含在driver和executor的classpath中。目录范围在–jars中不起作用。
Spark使用如下的URL模式来允许不同的策略传递jar:

file: 绝对路径,并且file:/ URLs由driver的HTTP文件服务提供服务,每个executor从driver的HTTP服务拉取文件。
hdfs:、http:、https:、ftp: 这些按照期望的那样从URI拉取文件和Jars。
local: 一个以local:/开头的URI,希望作为每个worker节点上的本地文件而存在。这意味着将不会发生网络IO。这种适用于将较大文件或jar推送到每个worker或通过NFS、GlusterFS等共享较大文件或Jar的方式。

注意,JARs和文件会为每个运行在executor节点上的SparkContext拷贝一份到工作目录。随着时间的推移,这将耗费大量的空间,因此需要清理。对于使用YARN的方式,清理将会自动方式;对于使用standalone方式的,自动清理工作可以通过spark.worker.cleanup.appDataTtl属性配置。

用户还可以通过使用-packages提供以逗号分隔的Maven坐标列表来包含任何其他依赖。使用此命令时,所有传递的依赖都将被处理。另外,使用–repositories选项,还可以用来添加maven库。多个库之间使用逗号分隔。这些命令可以被pyspark、spark-shell以及spark-submit来使用来包含Saprk包。
对于Python,–py-files选项可以被用来分发.egg、.zip以及.py文件到executors。

More Information

一旦你部署了你的应用,cluster mode overview 描述了分布式执行中的各个组件,以及如何监控和调试应用。

Spark 2.3.1 Overview

发表于 2018-08-09   |   分类于 spark 2.3.1

Spark Overview

Apache Spark是一个很快的用于一般目的的集群计算系统。它在Java、Python和R语言上提供了高级别的API,并且提供了一个支持一般图计算的优化引擎。它还提供了一组丰富的高级别的工具,包括SQL和结构化数据处理所需要的Spark SQL、机器学习所需要的MLlib、图处理所需要的GraphX以及Spark Streaming。

Downloading

从项目网站的下载页获取Spark。这个文档为是针对的Spark2.3.1版本。Spark为了使用HDFS和YARN使用了Hadoop客户端库。这个下载中预置了一些常用的Hadoop版本。用户还可以下载一个”Hadoop free”库通过Spark的classpath指定Hadoop版本来运行Spark。Scala和Java用户可以在自己的项目的中使用Spark的Mave依赖来包含Spark,而Python用户在未来也可以从PyPI中安装Spark。

如果你喜欢从源码构建Spark,可以通过这个链接来操作。

Spark能够运行在Windowns和类UNIX的系统上。在一台机器上以本地模式运行很容易–你需要做的事情就是在你的系统路径中安装java或者在环境变量JAVA_HOME中指向Java的安装。

Spark运行在Java 8+, Python 2.7+/3.4+或R 3.1+上。对于Scala API,Spark2.3.1使用的是Scala2.11。你需要使用一个合适Scala版本(Scala2.11+)。

注意,对于Java 7、Python 2.6以及2.6.5以前的Hadoop版本的支持,已经在Spark 2.2.0中移除。对于Scala2.10版本的支持在Spark 2.3.0中移除了。

Running the Examples and Shell

Spark带有一些简单的样例程序。Scala、Java和R的样例都在examples/src/main目录下。想要运行一个Java或Scala样例程序,需要使用顶级Spark目录下bin/run-example [params]。如:

1
./bin/run-example SparkPi 10

你还可以通过Scala shell的一个修改版,以交互的方式运行Saprk。这对于学习这个框架是很好的:

1
./bin/spark-shell --master local[2]

其中的–master选项指定了一个分布式集群的master的URL,或者使用一个线程以本地模式运行,或者local[N]表示使用N个线程以本地模式运行。你可以从使用本地模式做测试来开始。对于选项的全部列表,使用使用–help选项来运行Spark shell。
Spark还提供了一个Python的API。想要在Python解析器中以交互方式运行Spark,可以使用 bin/pyspark:

1
./bin/pyspark --master local[2]

样例application也提供了Python版本。如:

1
./bin/spark-submit examples/src/main/python/pi.py 10

从Spark1.4开始Spark也提供了R API的样例。要以R解析器中以交互方式运行Spark,可以使用 bin/sparkR:

1
./bin/sparkR --master local[2]

样例程序同样也提供了R语言版本的,如:

1
./bin/spark-submit examples/src/main/r/dataframe.R

Hive Study

发表于 2018-07-13   |   分类于 bigdata

本文用来记录自己在使用Hive Sql方面的一些经验。

创建表

1
2
3
4
5
6
7
8
9
10
11
12
13
# 创建了一个带有两个分区的表,这个表按照partition_date和hour进行分区
CREATE EXTERNAL TABLE `user.user_action`(
`action` string COMMENT '{"chs_name":"", "description":"","etl":"","value":"","remark":""}',
`num` double comment '{"chs_name":"", "description":"","etl":"","value":"","remark":""}'
)
PARTITIONED BY ( `partition_date` string COMMENT '分区日期', `hour` string COMMENT '小时')
ROW FORMAT DELIMITED
--TODO: 导入MYSQL的表建议'\t'分隔
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED as textfile;

查询数据并将数据写入到表中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
INSERT OVERWRITE TABLE user.user_action
partition(partition_date = '20180602', hour='0')
select action,
sum(num) as n
from (
select action,
num
from (
select momo_id,
event_num_map
from db.event_summary
where partition_date = '20180602'
and size(event_num_map)>0
)a
LATERAL VIEW EXPLODE(event_num_map)t AS action, num
)b
group by action;

该表从db.event_summary中查询数据然后吸入到user.user_action表中。需要注意db.event_summay中的event_num_map字段是一个map,map的key是action,value是action的数量。这里使用了一个函数LATERAL VIEW EXPLODE,用来map展开。

一些常用函数

ROW_NUMBER() OVER()函数

ROW_NUMBER() OVER()函数用来为每条记录返回一个行号,可以用来对记录进行排序并返回该序号,需要从1开始排序。
OVER()是一个聚合函数,可以对记录进行分组和排序。ROW_NUMBER()不能单独使用,必须搭配OVER()才能使用,否则会报错。

1
select *, row_number() over() as r from mytable;

配合partition by/order by
按照某个字段排序后返回行号

1
select *, row_number() over(partition by aaaaab order by num desc) r from mytable;

按照aaaaab分组后,并根据aaaaaab进行倒序排列。

SQL中的类型转换

需要使用cast()函数进行类型转换。

cast(str_column as int)

一些经验的总结

一个表中分时段记录内容的统一查询

需求

遇到的情况是这样的,有一个表A,表A中有24个字段(event_0_map … event_24_map)用来记录对应小时内每个用户各自发生的一些事情的数量。表结构如下:

1
2
3
4
5
id string = 1000010
event_0_map = {'event0':200, 'event2':100}
...
event_24_map = {'event0':500, 'event2':800}
partition = '20180101'

现在有一个需求:需要统计每个小时发生事件最多的前100个事件

分析

因为是每个小时执行的任务,而且每个小时的数据是存放在不同的字段里面,而字段名在SQL中是不可以拼接的,如:event_24_map,无法来拼接,因此有两种方案。

方案一

生成24个任务,每个任务的SQL都一样,只是查询的字段不一样

1
2
3
4
5
6
7
8
9
10
select action,
num
from (
select id,
event_0_map
from online.tableA
where partition = '${partition_date}'
and size(event_0_map)>0
)a0
LATERAL VIEW EXPLODE(event_0_map)t AS action, num

方案二(推荐)

将所有的字段同时解析,生成一个大表,再对大表进行过滤查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
select action,
num
from (
select action,
num,
'00' as hour
from (
select id,
event_0_map
from online.tableA
where partition = '${partition_date}'
and size(event_0_map)>0
)a0
LATERAL VIEW EXPLODE(event_0_map)t AS action, num
union all
select action,
num,
'01' as hour
from (
select id,
event_2_map
from online.tableA
where partition = '${partition_date}'
and size(event_2_map)>0
)a1
LATERAL VIEW EXPLODE(event_2_map)t AS action, num
) data
where hour = '${partition_hour}'

表的删除和恢复

在使用Hive的表的过程中,难免会有对表进行删除的情况,其实把表删除后,数据文件还是存在的,那么如何将数据按照新表的结构恢复一下呢?可以如下操作,但是需要注意的是,对于新增的字段,值是NULL。

1
2
3
4
5
drop table db.table_test;
...
create table xxx...
...
MSCK REPAIR TABLE db.table_test;

Easy Use Mapreduce

发表于 2018-07-10   |   分类于 bigdata

本文用来记录MR的使用,已经遇到的一些问题和解决方法

#使用Python执行MR

Mapper的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import json
import time
#加载编码
reload(sys)
sys.setdefaultencoding('utf-8')
for line in sys.stdin:
j = json.loads(line.strip())
print "%s\t%s" % (j.get("name"), j.get("age"))

Reducer的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import json
import time
#加载编码
reload(sys)
sys.setdefaultencoding('utf-8')
uri_count = {}
for line in sys.stdin:
data = line.strip().split("\t")
key = "%s-%s" % (data[0], data[1])
c = uri_count.get(key, 0)
uri_count[key] = c + 1
for key in uri_count:
print "%s\t%s" % (key, uri_count.get(key, 0))

从上面的代码可以看出来,python的脚本需要从标准输入(sys.stdin)中接入数据。

执行Mapreduce

1
2
3
4
5
6
7
8
9
/home/hadoop/yarn-2.8.1/bin/hadoop jar /home/hadoop/yarn-2.8.1/share/hadoop/tools/lib/hadoop-streaming-2.8.1.jar \
-D mapreduce.job.queuename=bigdata.queue \
-input hdfs://nameservice1/data/mylogs/api_request/2018/07/03/*/* \
-output /tmp/20180703SpecialUri \
-mapper "specialUriMapper.py" \
-reducer "specialUriReducer.py" \
-file /home/hadoop/script/user_action/specialUriMapper.py \
-file /home/hadoop/script/user_action/specialUriReducer.py \
-file /home/hadoop/script/user_action/kickA.log

参数说明:

-D mapreduce.job.queuename用指定需要运行MR的队列
-input MR的输入
-output MR的输出
-mapper 指定执行MR中Mapper的程序
-reducer 指定执行MR中Reducer的程序
-file 需要一起上传的文件,如果python程序中使用了其他的数据文件,可以通过这个参数一起上传。

其他一些参数:

-D mapreduce.job.name Job的名称
-D mapreduce.job.user.name
-D mapreduce.job.node-label-expression
-D mapreduce.job.queuename
-D mapreduce.map.memory.mb
-D mapreduce.reduce.memory.mb

netty-study

发表于 2018-06-29   |   分类于 netty

NETTY学习笔记

kafka-script

发表于 2017-05-24

本文主要讨论kafka服务的相关启动和关闭脚本。

kafka-server-start.sh

Kafka服务的启动脚本,正确的用法为 kafka-server-start.sh [-daemon] server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 如果执行脚本时传入的参数小于1个,则退出执行并提示用户需要指定服务属性配置文件, 此处也说明了执行kafka-server-start.sh的正确用法
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties"
exit 1
fi
# $0 表示的是当前shell的文件名,dirname用来获取当前shell文件的所在目录
base_dir=$(dirname $0)
# 读取环境变量中的KAFKA_LOG4J_OPTS的信息,如果没有配置该环境变量,则将kafka目录下conf中的log4j.properties作为配置添加到环境变量中,配置给KAFKA_LOG4J_OPTS
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
# 读取环境变量中KAFKA_HEAP_OPTS的信息,如果没有配置该环境变量,则使用默认配置"-Xmx1G -Xms1G"来配置,并添加到环境变量"KAFKA_HEAP_OPTS"中
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
# 定义一个额外的参数 name,为kafka服务指定了进程名
EXTRA_ARGS="-name kafkaServer -loggc"
# 如果服务要作为后台进程运行,则需要添加-daemon参数,而且这个参数必须是第一个参数,如果第一个参数是-daemon,则为进程添加自定义的名称
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
# 启动kafka服务,由此处也可以看出来,可以使用kafka-run-class.sh来执行相关的类,其中$@表示的是命令行传入的所有参数,这里要启动的类名为kafka.Kafka
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka $@

kafka-server-stop.sh

Kafka服务的停止脚本,其实就是查找KafkaServer对应的进程号,并kill。

1
2
# 在进程中过滤包含"kafka.Kafka"且不包含"grep"的java进程,截取进程号kill掉
ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM

kafka-run-class.sh

kafka-run-class.sh是用来运行class的脚本。正确的用法为 kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# 验证kafka-run-class脚本的参数
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
exit 1
fi
# 获取Kafka的基目录,就是当前目录(bin)的上一层目录
base_dir=$(dirname $0)/..
# 创建Kafka的日志目录,首先从环境变量“LOG_DIR”中读取,如果没有配置LOG_DIR,则使用Kafka基目录下的logs目录作为日志目录
# create logs directory
if [ "x$LOG_DIR" = "x" ]; then
LOG_DIR="$base_dir/logs"
fi
# 如果日志目录目存在则创建日志目录
if [ ! -d "$LOG_DIR" ]; then
mkdir -p "$LOG_DIR"
fi
# 获取Scala的版本号,首先从环境变量 SCALA_VERSION 中读取,如果没有配置,则使用默认值 2.10.4
if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.10.4
fi
# 获取Scala库的版本号,首先从环境变量 SCALA_BINARY_VERSION 中读取,如果没有配置,则使用默认值 2.10
if [ -z "$SCALA_BINARY_VERSION" ]; then
SCALA_BINARY_VERSION=2.10
fi
# 这里开始加载各种依赖的jar包,并将这些jar包添加到CLASSPATH环境变量中,由此也可以看出运行完整的Kafka服务(支持各种consumer/producer)需要依赖的jar包
# run ./gradlew copyDependantLibs to get all dependant jars in a local dir
# 将Kafka依赖Scala的jar包添加到CLASSPATH中
for file in $base_dir/core/build/dependant-libs-${SCALA_VERSION}*/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将Kafka的示例jar添加到CLASSPATH中
for file in $base_dir/examples/build/libs//kafka-examples*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将kafka的hadoop consumer相关jar包添加到CLASSPATH中
for file in $base_dir/contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将Kafka的hadoop producer相关jar包添加到CLASSPATH中
for file in $base_dir/contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将Kafka客户端相关的jar包添加到CLASSPATH中
for file in $base_dir/clients/build/libs/kafka-clients*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将Kafka的libs下的jar包添加到CLASSPATH中
# classpath addition for release
for file in $base_dir/libs/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 将Kafka依赖的Scala对应版本的库添加到CLASSPATH中
for file in $base_dir/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# 以下是Java管理扩展的设置
# 如果没有在环境变量中设置KAFKA_JMX_OPTS,则将Kafka的JMX配置关闭
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
# 如果设置了KAFKA_JMX_OPTS环境变量,则利用这个值来设置变量KAFKA_JMX_OPTS的值,该值用于指定虚拟机的信息
# JMX port to use
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi
# Log4j的配置
# Log4j settings 如果环境变量中没有设置KAFKA_LOG4J_OPTS,则使用Kafka基目录下conf/tools-log4j.properties来设置KAFKA_LOG4J_OPTS变量
if [ -z "$KAFKA_LOG4J_OPTS" ]; then
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/config/tools-log4j.properties"
fi
# 根据环境变量LOG_DIR和KAFKA_LOG4J_OPTS来生成变量KAFKA_LOG4J_OPTS的新的值
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
# 判断环境变量KAFKA_OPTS是否有相关设置
# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS=""
fi
# 判断环境变量JAVA_HOME中是否有值,如果不存在则使用默认的java,如果有,则使用该目录下指定的java
# Which java to use
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
# Kafka的内存配置,如果环境变量KAFKA_HEAP_OPTS的值为空,则设置值为默认值-Xmx256M
# Memory options
if [ -z "$KAFKA_HEAP_OPTS" ]; then
KAFKA_HEAP_OPTS="-Xmx256M"
fi
# 如果没有设置环境变量KAFKA_JVM-PERFORMANCE_OPTS,则使用默认值进行配置
# JVM performance options
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
# 这里对脚本传入的参数进行解析,提取守护进程名/是否后台运行/GC日志这个三个信息
# 第一个case,如果循环到了-name参数,则读取-name的下一参数,下一个参数必定是后台进程的名字,而且控制台的输出日志文件也是该名字
# 第二个case,如果循环到了-loggc,则表示要记录GC日志,记录GC日志的另一个要求是配置KAFKA_GC_LOG_OPTS环境变量
# 第三个case,如果循环到了-daemon,则表示服务以后台进程的方式运行
while [ $# -gt 0 ]; do
COMMAND=$1
case $COMMAND in
-name)
DAEMON_NAME=$2
CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
shift 2
;;
-loggc)
if [ -z "$KAFKA_GC_LOG_OPTS"] ; then
GC_LOG_ENABLED="true"
fi
shift
;;
-daemon)
DAEMON_MODE="true"
shift
;;
*)
break
;;
esac
done
# 如果启用了GC日志,GC日志的名字为后台进程的名字[-name指定]-gc.log。
# GC options
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps "
fi
# 启动Java进程,将上面的所有信息整合在一起,使用指定的Java,还有各种参数,这里区分了运行模式,其实就是将进程作为后台进程运行还是前台进程运行而已
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

kafka-topics.sh

kafka-topics.sh是用来操作Kafka的Topic的脚本,其内部通过kafka-run-class.sh脚本来调用kafka.admin.TopicCommand来实现Topic的操作。

1
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@

1234
baimoon

baimoon

Baimoon's blog

66 日志
24 分类
30 标签
GitHub
Links
  • xrange
© 2016-07 - 2020 baimoon
由 Hexo 强力驱动
主题 - NexT.Muse