spark-2-11-metircs

本文按照Metric的基本认识、Spark对Metrics system的配置、源码分析MetricsSystem的顺序进行学习

Metrics

目前最流行的metrics库是dropwizard/metircs,spark使用的也是这个库。下面我们介绍一下dropwizard/metircs的概念和用法。

Maven依赖

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version>
</dependency>
</dependencies>

Metric的基本使用

MetricRegistry类是Metrics的核心,他是存放应用中所有metrics的容器,也是我们使用Metrics的第一步:

1
MetricRegistry metricRegistry = new MetricsRegistry();

每个Metrics都有一个唯一的名字,我们可以通过MetricRegistry.name来生成:

1
metricName = MetricRegistry.name("name", "namesecond", "namethrid"); //生成name.namescond.namethrid

有了MetricRegistry和Metric之后,接下来需要进行注册

1
metricRegistry.register(metricName, MetricType)

注册的时候,需要指定Metric的类型,详细的类型,后面会介绍。
以下是Spark中的一些使用参考:

1
2
3
4
5
override val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
override def getValue: Int = threadPool.getActiveCount()
})

Metrics类型

Metrics有五种类型,分别是Gauges、Counters、Meters、Histograms和Timers。

Gauges

Gauges是最简单的Metrics类型,该对象中只有一个方法getValue用于返回统计的值,下面是Gauges接口的定义:

1
2
3
public interface Gauge<T> extends Metric {
T getValue();
}

从方法可以看出,Gauge中的getValue可以返回与Gauge定义类型相同的任意类型。
以下是Spark中Gauges类型Metric的定义:

1
2
3
4
5
6
7
8
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
override def getValue: Int = threadPool.getActiveCount()
})
// Gauge for executor thread pool's approximate total number of tasks that have been completed
metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
override def getValue: Long = threadPool.getCompletedTaskCount()
})

分别定义了一个Int类型和一个Long类型的Gauges。

Counters

Counter是计数器,既然是计数器,因此可增可减。Counter其实是对AtomicLong的封装。

Meters支持的方法

inc(): 计数器自加。
dec(): 计数器自减。

生成方法

1
pendingJobs = registry.counter(MetricRegistry.name(Queue.class,"pending-jobs","size"));

Meters

Meters用来计算事件发生的速率。Meters会统计近1分钟、5分钟、15分钟以及全部时间的速率。

Meters可用的方法

mark()方法:表示发生一次事件。

生成方法

1
Meter meterTps = registry.meter(MetricRegistry.name(MeterTest.class,"request","tps"));

Histograms

Histograms统计数据的分布情况。比如最大值、最小值、中间值、中位数、75百分位、90百分位、98百分位、99百分位和99.9百分位的值。

Histograms可用的方法

histogram.update(…): 更新一个数

生成方法

1
2
Histogram histogram - new Histogram(...)
registery.register(MetricRegistry.name(...), histogram)

Timers

Timer可以理解为Histogram和Meter的结合。

Timer可用的方法

timer.time(): 记录时间

生成方法

1
Timer timer = registry.timer(MetricRegistry.name(...))

Metrics的输出

通过JMX

1
2
final JmxReporter reporter = jmxReporter.forRegistry(registry).build()
reporter.start()

一旦reporter被启动,所有在registry中的metrics可以通过JConsole或VisualVM(如果安装了MBeans插件)可见。

其他方式

除了JMX,Metrics还能够以HTTP、STDOUT、CSV、SLFJ、Ganglia和Graphite的方式输出。 详细可以参考:https://metrics.dropwizard.io/3.1.0/getting-startedhttps://metrics.dropwizard.io/3.1.0/manual/core/

Spark中Metrics的配置

Spark的Metrics的配置,在$SPARK_HOME/conf/metrics.properties文件中配置。如果用户需要制定自己的配置文件,可以通过spark.metrics.conf来指定。默认情况下,driver或executor的root命名空间为spark.app.id,但是在某些情况下,用户想要跨driver或executor跟踪Metrics,对于这种情况,可以使用spark.metrics.namespace来自定义命名空间。

实例

在Spark中,Metrics根据Spark的组建被划分中不同的实例。对于每个实例,可以配置一组sinks来报告metrics。如下是当前支持的实例:

实例名 解释
master standalone模式Spark的master进程
application master中的组件,对不同applications进行报告
worker standalone模式Spark的worker进程
executor 一个Spark executor
driver Spark的driver进程
shuffleService Spark的Shuffle服务
applicationMaster 当在Yarn上运行时,Spark的application master

Sinks

每个实例能够汇报给0个或多个sink。Sinks包含在org.apache.spark.metrics.sink包中,有如下sink:

sink 解释
ConsoleSink 记录metrics信息到 console
CSVSink 以一定的间隔,将metrics数据导出到CSV文件
JmxSink 将metrics注册到JMX console
MetricsServlet 在已有的Spark UI中添加一个Servlet,以JONS数据格式来服务metrics数据
GraphiteSink 将metrics发送给Graphite节点
Slf4jSink 将metrics发送给slf4j
StatsdSink 将metrics发送给StatsD节点
GangliaSink 将metrics发送给Ganglia节点,需要重新编译Spark,将gangliaSink打包进去,默认不包含

metrics.properties

可以参考:$SPARK_HOME/conf/metrics.properties.template文件,在这个文件中对source、sink等作了详细的解释,并提供了例子。

从Spark源码看Metrics的使用

Spark中对于Metrics的处理集中在core模块下的org.apache.spark.metrics包中。
包结构如下:
image.png
其中MetricsSystem是该模块的入口,MetricsConfig为模块的配置管理,Source是Metric系统的数据源,Sink是Metrics的量输出。

MetricsConfig

MetricsConfig用来加载metrics的配置,并对MetricsSystem进行相关的配置。在MetricsSystem类中,会生成MetricsConfig对象,并进行初始化操作。

首先看看MetricsConfig的一些属性

正如Spark文档说的,默认的Metrics配置文件为 metrics.properties

initialize方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def initialize() {
// 添加默认属性,用于没有配置文件的情况
setDefaultProperties(properties)
// 从metrics配置文件中加载metrics配置
loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
// 从Spark配置文件中加载metrics配置
val prefix = "spark.metrics.conf."
conf.getAll.foreach {
case (k, v) if k.startsWith(prefix) =>
properties.setProperty(k.substring(prefix.length()), v)
case _ =>
}
perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
// 判断是否以 * 开始的实例配置,这种表示是所有实例的配置(称为默认实例配置)
if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
// 获取默认实例配置
val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
// 循环所有非默认实例配置,将实例配置添加到非实例配置中(注意,实例配置已有的属性不会被覆盖)
for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
(k, v) <- defaultSubProperties if (prop.get(k) == null)) {
prop.put(k, v)
}
}
}

在initialize方法中,首先会调用setDefaultProperties方法来加载默认配置,这是为了应对没有设置Metrics配置文件的情况(连默认的metrics.properties都没有提供)。
然后会调用loadPropertiesFromFile方法,从指定的配置文件中加载Metrics配置。
再然后,会读取Spark的所有配置项,从中筛选以”spark.metrics.conf.”为前缀的配置项,并将配置项添加到属性中。需要注意的是,在添加属性的时候,并非以配置项的全名作为属性,而是以子名作为属性名,例如:spark.metrics.conf.xxx,那么会使用xxx作为属性名,而spark.metrics.conf.xxx.x1则会使用xxx.x1作为属性名。
最后,该方法还会对实例属性(指定了实例的配置)进行整理。实例属性的配置类似driver.path这样的,driver就是实例,因此这里就有一个问题,对于所有实例通用的配置应该怎么设置呢?答案是.path这样的。有的则表示通用的默认值,没有的配置,第一个dot(.)前的为实例。有实例的配置会覆盖没有实例的配置。代码中的属性private val INSTANCE_REGEX = “^(\|[a-zA-Z]+)\.(.+)”.r,就解释了实例配置的模式,要不就是以“”开头,要么就以字母开头,以字母开头的表示具体实例,以“”开头的表示所有实例通用。
因此也就可以理解setDefaultProperties方法中设置默认值的作用。整理的作用就是将默认实例配置(以“*”开头的实例配置)添加到非默认实例配置中(不会覆盖非默认实例配置已有的属性)。
比如,有如下属性

1
{"\\*.class"->"default_class", "\\*.path"->"default_path, "driver.path"->"driver_path"}

对于driver实例,他最后得到的属性为

1
{{"driver"->Map("path"->"driver_path", "class"->"default_class"}}

setDefaultProperties

当没有配置metrics配置文件时(是指连默认的配置文件都不存在),采取的默认Metrics配置。

1
2
3
4
5
6
7
8
private def setDefaultProperties(prop: Properties) {
// 默认开启了servlet类型的sink,指定servlet的class为org.apache.spark.metrics.sink.MetricsServlet
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
// 先为所有实例设置,再为master和applications类型的实例设置
prop.setProperty("*.sink.servlet.path", "/metrics/json")
prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}

loadPropertiesFromFile

方法需要path参数(配置文件的名字-从Spark的配置项spark.metrics.conf中读取),如果给定的path存在,则从这个文件中加载,如果path不存在,则从classPath目录中加载默认配置文件metrics.properties。

MetricsSystem

MetricsSystem类是对外提供的一个对象,从调用来看,在Master、SparkEnv和Work中都有创建MetricsSystem的代码。其中Master和Work是对于standalone模式下创建的。SparkEnv中是对driver和executor进行创建的。生成MetricsSystem的时候是需要指定instance参数的。因此从创建MetricsSystem的代码我们也能够知道MetricsSystem都支持哪些instance。
TODO 对于applications的创建,有时间再找
这里,我们先抛开对MetricsSystem的操作,我们先看看MetricsSystem的实现

创建MetricsSystem

从所有的生成MetricsSyste的代码来看,MetricsSystem都是通过如下代码创建的:

1
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr) // 这里创建的一个work实例的MetricsSystem

而对于createMetricsSystem方法的定义也很多简单,就是new了一个MetricsSystem对象:

1
2
3
4
def createMetricsSystem(
instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr)
}

注册Source

创建了MetricsSystem,接下来就需要向它注册source。我们以Worker中MetricsSystem的使用来作为参考。

1
2
3
metricsSystem.registerSource(workerSource)
metricsSystem.start()
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)

因此,我们来看一下MetricsSystem的registerSource方法

1
2
3
4
5
6
7
8
9
def registerSource(source: Source) {
sources += source
try {
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}

将source添加Source列表中,然后调用buildRegistryName方法生成source注册使用的名字,然后进行注册,那么需要看一下buildRegistryName方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private[spark] def buildRegistryName(source: Source): String = {
val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)
if (instance == "driver" || instance == "executor") {
if (metricsNamespace.isDefined && executorId.isDefined) {
MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
} else {
// Only Driver and Executor set spark.app.id and spark.executor.id.
// Other instance types, e.g. Master and Worker, are not related to a specific application.
if (metricsNamespace.isEmpty) {
logWarning(s"Using default name $defaultName for source because neither " +
s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
}
if (executorId.isEmpty) {
logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
s"not set.")
}
defaultName
}
} else { defaultName }
}

buildRegistryName方法,就是根据Source,构建一个向MetricRegistry注册的名字。这个名字是唯一的。其基本思路就是根据namespace(从spark.metrics.namespace或spark.app.id中取)和executorId(从spark.executor.id中取)构建一个注册用的名字,但是需要注意的是,是否使用这两个值作为构建的条件,要依赖实例是否是drvier或executor,否则一律使用source的名字生成。
注册好Source,调用MetricsSystem的start方法启动MetricsSystem。

启动start方法

1
2
3
4
5
6
7
8
def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
StaticSources.allSources.foreach(registerSource)
registerSources()
registerSinks()
sinks.foreach(_.start)
}

start方法代码很少,但是包含的东西却挺多。注册静态Source,所谓的静态Source,其实就是不依赖用户配置的Source。接着调用registerSources方法,这个注册是读取配置的文件中当前实例(instance)的配置,对Source的class进行实例化并注册。然后是调用registerSinks方法来注册Sink,也是将配置文件中当前实例(instance)的配置进行操作,对Sink的class进行实例化,并加入到Sink集合中等待调用(调用sink的report方法);最后启动所有的sink。

Source

Source就是Metrics的数据源,我们对Source中的统计数据进行操作,Metrics系统会从已经注册的Source中获取这些数据,然后由Sink报告出去。下面我们以WorkSource来进行分析,看看如何自定义一个Source。

WorkSource

先看一下WorkSource的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private[worker] class WorkerSource(val worker: Worker) extends Source {
override val sourceName = "worker"
override val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
})
// Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})
...
}

WorkSource中metric的类型只涉及到了Gauge,也就是这些值不需要用户参与。对于需要用户参与的我们可以参考StaticSources,稍后也会看到。WorkSource的逻辑很简单,就是注册了一些metric,并制定了这些metric如何取值。

StaticSources$HiveCatalogMetrics

接下来看一下StaticSources$HiveCatalogMetrics的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object HiveCatalogMetrics extends Source {
override val sourceName: String = "HiveExternalCatalog"
override val metricRegistry: MetricRegistry = new MetricRegistry()
val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
...
def reset(): Unit = {
METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
...
}
// clients can use these to avoid classloader issues with the codahale classes
def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
...
}

这个类中metric的类型为Counter,reset方法、incrementFetchedPartitions方法和incrementFilesDiscovered方法就是对这些Counter进行操作。

Sink

Spark中已经内置了多种Sink,有ConsoleSink、CsvSink、JmxSink、MetricsServlet等,基本可以满足我们的需求了。接下来我们分析一两个常用Sink的实现。

JmxSink

JmxSink的定义如下:

1
2
3
4
5
6
7
8
9
10
11
private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
override def start() {
reporter.start()
}
override def stop() {
reporter.stop()
}
override def report() { }
}

JmxSink继承Sink特征,因此它需要实现start()、stop()和report()方法。另外在JmxSink定义中创建了一个JmxReporter对象,创建reporter对象时需要MetricRegistry对象,这就将Source和Sink中的Report联系了起来(metric注册到Source的MetricRegistry,source注册到MetricsSystem的MetricRegistry,在生成Sink中Reporter的时候会使用MetrcsSystem中的MetricRegistry)。那么Sink是如何报告自己收集的metrics呢?我理解的是,内部的Reporter会自己报告,也可以调用Sink的report方法,report方法再调用reporter的report方法。(—-TODO,Reporter的汇报,会稍后补充)

ConsoleSink

ConsoleSink的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CONSOLE_DEFAULT_PERIOD
}
val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
override def report() {
reporter.report()
}
}

ConsoleSink中也实现了三个必须的方法,除此之外还定义了一个ConsoleReporter。而且它实现的三个方法其实也是对ConsoleReporter进行操作的。查看其他的Sink,也能够发现,每个Sink都有一个Reporter(除了MetricsServlet外,它的逻辑与普通Sink不同)。

Source和Sink的操作

Source的操作

对于Source的操作,我们就参考StaticSources$HiveCatalogMetrics中相关metric的操作吧。
如下是HiveClientImpl的一个方法,他就在操作Source的METRIC_PARTITIONS_FETCHED。

1
2
3
4
5
6
7
8
9
10
11
12
13
override def getPartitions(
table: CatalogTable,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
val parts = spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
case Some(s) =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
}
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}

Sink的操作

对于Sink的调用,是在MetricsSystem的report方法中触发的:

1
2
3
def report() {
sinks.foreach(_.report())
}

对于MetricsSystem中report方法的调用,会在各个组建停止的时候调用,进行强制汇报。

Source、Sink、MetricsSystem以及MetricsConfig之间的关系

image.png

自定义Source

要定义自己的Source对象,需要实现org.apache.spark.metrics.source.Source特征,并实现sourceName(String类型)和metricRegistry(MetricRegistry类型)方法。之后定义自己的metric类型即可,可用的metric类型已经在上面的第一部分进行介绍。如下是DAGScheduulerSource的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private[scheduler] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "DAGScheduler"
metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failedStages.size
})
metricRegistry.register(MetricRegistry.name("stage", "runningStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.runningStages.size
})
metricRegistry.register(MetricRegistry.name("stage", "waitingStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waitingStages.size
})
metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.numTotalJobs
})
metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
/** Timer that tracks the time to process messages in the DAGScheduler's event loop */
val messageProcessingTimer: Timer =
metricRegistry.timer(MetricRegistry.name("messageProcessingTime"))
}

在这个类的定义中,实现了Source特征,并重写了metricRegistry和sourceName方法。接下来注册了自己要统计的metric,默认是以servlet中展示这些信息,我们可以通过www.xxxxx:8088/proxy/application_id/metrics/json来查看对应的输出:

1
2
3
...
"application_1530945921317_6019078.driver.DAGScheduler.job.activeJobs":{"value":2},"application_1530945921317_6019078.driver.DAGScheduler.job.allJobs":{"value":77},"application_1530945921317_6019078.driver.DAGScheduler.stage.failedStages":{"value":0},"application_1530945921317_6019078.driver.DAGScheduler.stage.runningStages":{"value":2},"application_1530945921317_6019078.driver.DAGScheduler.stage.waitingStages":{"value":0},
...

自定义的类截图
image.png
该类会在DAGScheduler中进行实例化:
image.png
然后在SparkContext中进行注册:
image.png
以下是自定义的source注册后,在servlet中的显示
image.png

问题

如果在配置文件中配置自己的source呢?关键是如何定义。。。需要看一下

对于配置文件中metrics的配置格式如下:

[instance].[sink|source].[name].[options] = xxxx
[instance]可以是master、worker、executor、driver或application,也就是明确了实例的类型。如果让所有实例都可以使用,可以使用“*”代替。
[sink|source]指定配置项是sink的信息还是source的信息,只能二选一。
[name]指定sink或source的名字。
[options]指定sink或source的相关属性,如class。

注意点:如果是需要进行配置的自定义source,有两点需要注意。

配置中必须通过class属性指定类名,程序会根据类名进行反射。
配置的自定义Source必须有不含构造参数的构造方法,否则无法实例化。

配置自定义Source

编辑 metrics.properties配置文件
增加如下配置即可

1
driver.source.rwmTest.class=org.apache.spark.metrics.source.RWMTestSource

UI中的显示信息
image.png

配置项

Spark config

配置项 默认值 意义
spark.metrics.conf metrics.properties Metrics系统的配置文件
spark.metrics.conf.xxx 将xxx作为属性,添加到属性中,可以将其理解为 metrics.properties中配置的一种转移,会和metrics.properties中的配置项放在一起,而且此配置优先级较高
spark.metrics.namespace metrics的命名空间,参与构建Metrics source注册名的生成
spark.app.id 如果spark.metrics.namespace没有指定值,则使用该值作为namespace,参与构建Metrics source注册名的生成
spark.executor.id executor的id,参数构建Metrics source注册名的生成

Metrics config

配置项 默认值 意义
source.(.+).(.+) source的配置信息,就是以source.xxx.xxx形式的,只能有两个dot(.)
sink.(.+).(.+) sink的配置信息,就是以sink.xxx.xxx形式的,只能有两个dot(.)

对于source和sink的配置,参考:$SPARK_HOME/conf/metrics.properties.template