本文按照Metric的基本认识、Spark对Metrics system的配置、源码分析MetricsSystem的顺序进行学习
Metrics
目前最流行的metrics库是dropwizard/metircs,spark使用的也是这个库。下面我们介绍一下dropwizard/metircs的概念和用法。
Maven依赖
|
|
Metric的基本使用
MetricRegistry类是Metrics的核心,他是存放应用中所有metrics的容器,也是我们使用Metrics的第一步:
每个Metrics都有一个唯一的名字,我们可以通过MetricRegistry.name来生成:
有了MetricRegistry和Metric之后,接下来需要进行注册
注册的时候,需要指定Metric的类型,详细的类型,后面会介绍。
以下是Spark中的一些使用参考:
Metrics类型
Metrics有五种类型,分别是Gauges、Counters、Meters、Histograms和Timers。
Gauges
Gauges是最简单的Metrics类型,该对象中只有一个方法getValue用于返回统计的值,下面是Gauges接口的定义:
从方法可以看出,Gauge中的getValue可以返回与Gauge定义类型相同的任意类型。
以下是Spark中Gauges类型Metric的定义:
分别定义了一个Int类型和一个Long类型的Gauges。
Counters
Counter是计数器,既然是计数器,因此可增可减。Counter其实是对AtomicLong的封装。
Meters支持的方法
inc(): 计数器自加。
dec(): 计数器自减。
生成方法
|
|
Meters
Meters用来计算事件发生的速率。Meters会统计近1分钟、5分钟、15分钟以及全部时间的速率。
Meters可用的方法
mark()方法:表示发生一次事件。
生成方法
|
|
Histograms
Histograms统计数据的分布情况。比如最大值、最小值、中间值、中位数、75百分位、90百分位、98百分位、99百分位和99.9百分位的值。
Histograms可用的方法
histogram.update(…): 更新一个数
生成方法
|
|
Timers
Timer可以理解为Histogram和Meter的结合。
Timer可用的方法
timer.time(): 记录时间
生成方法
|
|
Metrics的输出
通过JMX
|
|
一旦reporter被启动,所有在registry中的metrics可以通过JConsole或VisualVM(如果安装了MBeans插件)可见。
其他方式
除了JMX,Metrics还能够以HTTP、STDOUT、CSV、SLFJ、Ganglia和Graphite的方式输出。 详细可以参考:https://metrics.dropwizard.io/3.1.0/getting-started 和 https://metrics.dropwizard.io/3.1.0/manual/core/
Spark中Metrics的配置
Spark的Metrics的配置,在$SPARK_HOME/conf/metrics.properties文件中配置。如果用户需要制定自己的配置文件,可以通过spark.metrics.conf来指定。默认情况下,driver或executor的root命名空间为spark.app.id,但是在某些情况下,用户想要跨driver或executor跟踪Metrics,对于这种情况,可以使用spark.metrics.namespace来自定义命名空间。
实例
在Spark中,Metrics根据Spark的组建被划分中不同的实例。对于每个实例,可以配置一组sinks来报告metrics。如下是当前支持的实例:
实例名 | 解释 |
---|---|
master | standalone模式Spark的master进程 |
application | master中的组件,对不同applications进行报告 |
worker | standalone模式Spark的worker进程 |
executor | 一个Spark executor |
driver | Spark的driver进程 |
shuffleService | Spark的Shuffle服务 |
applicationMaster | 当在Yarn上运行时,Spark的application master |
Sinks
每个实例能够汇报给0个或多个sink。Sinks包含在org.apache.spark.metrics.sink包中,有如下sink:
sink | 解释 |
---|---|
ConsoleSink | 记录metrics信息到 console |
CSVSink | 以一定的间隔,将metrics数据导出到CSV文件 |
JmxSink | 将metrics注册到JMX console |
MetricsServlet | 在已有的Spark UI中添加一个Servlet,以JONS数据格式来服务metrics数据 |
GraphiteSink | 将metrics发送给Graphite节点 |
Slf4jSink | 将metrics发送给slf4j |
StatsdSink | 将metrics发送给StatsD节点 |
GangliaSink | 将metrics发送给Ganglia节点,需要重新编译Spark,将gangliaSink打包进去,默认不包含 |
metrics.properties
可以参考:$SPARK_HOME/conf/metrics.properties.template文件,在这个文件中对source、sink等作了详细的解释,并提供了例子。
从Spark源码看Metrics的使用
Spark中对于Metrics的处理集中在core模块下的org.apache.spark.metrics包中。
包结构如下:
其中MetricsSystem是该模块的入口,MetricsConfig为模块的配置管理,Source是Metric系统的数据源,Sink是Metrics的量输出。
MetricsConfig
MetricsConfig用来加载metrics的配置,并对MetricsSystem进行相关的配置。在MetricsSystem类中,会生成MetricsConfig对象,并进行初始化操作。
首先看看MetricsConfig的一些属性
正如Spark文档说的,默认的Metrics配置文件为 metrics.properties
initialize方法
|
|
在initialize方法中,首先会调用setDefaultProperties方法来加载默认配置,这是为了应对没有设置Metrics配置文件的情况(连默认的metrics.properties都没有提供)。
然后会调用loadPropertiesFromFile方法,从指定的配置文件中加载Metrics配置。
再然后,会读取Spark的所有配置项,从中筛选以”spark.metrics.conf.”为前缀的配置项,并将配置项添加到属性中。需要注意的是,在添加属性的时候,并非以配置项的全名作为属性,而是以子名作为属性名,例如:spark.metrics.conf.xxx,那么会使用xxx作为属性名,而spark.metrics.conf.xxx.x1则会使用xxx.x1作为属性名。
最后,该方法还会对实例属性(指定了实例的配置)进行整理。实例属性的配置类似driver.path这样的,driver就是实例,因此这里就有一个问题,对于所有实例通用的配置应该怎么设置呢?答案是.path这样的。有的则表示通用的默认值,没有的配置,第一个dot(.)前的为实例。有实例的配置会覆盖没有实例的配置。代码中的属性private val INSTANCE_REGEX = “^(\|[a-zA-Z]+)\.(.+)”.r,就解释了实例配置的模式,要不就是以“”开头,要么就以字母开头,以字母开头的表示具体实例,以“”开头的表示所有实例通用。
因此也就可以理解setDefaultProperties方法中设置默认值的作用。整理的作用就是将默认实例配置(以“*”开头的实例配置)添加到非默认实例配置中(不会覆盖非默认实例配置已有的属性)。
比如,有如下属性
对于driver实例,他最后得到的属性为
setDefaultProperties
当没有配置metrics配置文件时(是指连默认的配置文件都不存在),采取的默认Metrics配置。
loadPropertiesFromFile
方法需要path参数(配置文件的名字-从Spark的配置项spark.metrics.conf中读取),如果给定的path存在,则从这个文件中加载,如果path不存在,则从classPath目录中加载默认配置文件metrics.properties。
MetricsSystem
MetricsSystem类是对外提供的一个对象,从调用来看,在Master、SparkEnv和Work中都有创建MetricsSystem的代码。其中Master和Work是对于standalone模式下创建的。SparkEnv中是对driver和executor进行创建的。生成MetricsSystem的时候是需要指定instance参数的。因此从创建MetricsSystem的代码我们也能够知道MetricsSystem都支持哪些instance。
TODO 对于applications的创建,有时间再找
这里,我们先抛开对MetricsSystem的操作,我们先看看MetricsSystem的实现
创建MetricsSystem
从所有的生成MetricsSyste的代码来看,MetricsSystem都是通过如下代码创建的:
|
|
而对于createMetricsSystem方法的定义也很多简单,就是new了一个MetricsSystem对象:
|
|
注册Source
创建了MetricsSystem,接下来就需要向它注册source。我们以Worker中MetricsSystem的使用来作为参考。
因此,我们来看一下MetricsSystem的registerSource方法
将source添加Source列表中,然后调用buildRegistryName方法生成source注册使用的名字,然后进行注册,那么需要看一下buildRegistryName方法
buildRegistryName方法,就是根据Source,构建一个向MetricRegistry注册的名字。这个名字是唯一的。其基本思路就是根据namespace(从spark.metrics.namespace或spark.app.id中取)和executorId(从spark.executor.id中取)构建一个注册用的名字,但是需要注意的是,是否使用这两个值作为构建的条件,要依赖实例是否是drvier或executor,否则一律使用source的名字生成。
注册好Source,调用MetricsSystem的start方法启动MetricsSystem。
启动start方法
|
|
start方法代码很少,但是包含的东西却挺多。注册静态Source,所谓的静态Source,其实就是不依赖用户配置的Source。接着调用registerSources方法,这个注册是读取配置的文件中当前实例(instance)的配置,对Source的class进行实例化并注册。然后是调用registerSinks方法来注册Sink,也是将配置文件中当前实例(instance)的配置进行操作,对Sink的class进行实例化,并加入到Sink集合中等待调用(调用sink的report方法);最后启动所有的sink。
Source
Source就是Metrics的数据源,我们对Source中的统计数据进行操作,Metrics系统会从已经注册的Source中获取这些数据,然后由Sink报告出去。下面我们以WorkSource来进行分析,看看如何自定义一个Source。
WorkSource
先看一下WorkSource的定义:
WorkSource中metric的类型只涉及到了Gauge,也就是这些值不需要用户参与。对于需要用户参与的我们可以参考StaticSources,稍后也会看到。WorkSource的逻辑很简单,就是注册了一些metric,并制定了这些metric如何取值。
StaticSources$HiveCatalogMetrics
接下来看一下StaticSources$HiveCatalogMetrics的定义:
这个类中metric的类型为Counter,reset方法、incrementFetchedPartitions方法和incrementFilesDiscovered方法就是对这些Counter进行操作。
Sink
Spark中已经内置了多种Sink,有ConsoleSink、CsvSink、JmxSink、MetricsServlet等,基本可以满足我们的需求了。接下来我们分析一两个常用Sink的实现。
JmxSink
JmxSink的定义如下:
JmxSink继承Sink特征,因此它需要实现start()、stop()和report()方法。另外在JmxSink定义中创建了一个JmxReporter对象,创建reporter对象时需要MetricRegistry对象,这就将Source和Sink中的Report联系了起来(metric注册到Source的MetricRegistry,source注册到MetricsSystem的MetricRegistry,在生成Sink中Reporter的时候会使用MetrcsSystem中的MetricRegistry)。那么Sink是如何报告自己收集的metrics呢?我理解的是,内部的Reporter会自己报告,也可以调用Sink的report方法,report方法再调用reporter的report方法。(—-TODO,Reporter的汇报,会稍后补充)
ConsoleSink
ConsoleSink的定义:
ConsoleSink中也实现了三个必须的方法,除此之外还定义了一个ConsoleReporter。而且它实现的三个方法其实也是对ConsoleReporter进行操作的。查看其他的Sink,也能够发现,每个Sink都有一个Reporter(除了MetricsServlet外,它的逻辑与普通Sink不同)。
Source和Sink的操作
Source的操作
对于Source的操作,我们就参考StaticSources$HiveCatalogMetrics中相关metric的操作吧。
如下是HiveClientImpl的一个方法,他就在操作Source的METRIC_PARTITIONS_FETCHED。
Sink的操作
对于Sink的调用,是在MetricsSystem的report方法中触发的:
对于MetricsSystem中report方法的调用,会在各个组建停止的时候调用,进行强制汇报。
Source、Sink、MetricsSystem以及MetricsConfig之间的关系
自定义Source
要定义自己的Source对象,需要实现org.apache.spark.metrics.source.Source特征,并实现sourceName(String类型)和metricRegistry(MetricRegistry类型)方法。之后定义自己的metric类型即可,可用的metric类型已经在上面的第一部分进行介绍。如下是DAGScheduulerSource的定义:
在这个类的定义中,实现了Source特征,并重写了metricRegistry和sourceName方法。接下来注册了自己要统计的metric,默认是以servlet中展示这些信息,我们可以通过www.xxxxx:8088/proxy/application_id/metrics/json来查看对应的输出:
自定义的类截图
该类会在DAGScheduler中进行实例化:
然后在SparkContext中进行注册:
以下是自定义的source注册后,在servlet中的显示
问题
如果在配置文件中配置自己的source呢?关键是如何定义。。。需要看一下
对于配置文件中metrics的配置格式如下:
[instance].[sink|source].[name].[options] = xxxx
[instance]可以是master、worker、executor、driver或application,也就是明确了实例的类型。如果让所有实例都可以使用,可以使用“*”代替。
[sink|source]指定配置项是sink的信息还是source的信息,只能二选一。
[name]指定sink或source的名字。
[options]指定sink或source的相关属性,如class。
注意点:如果是需要进行配置的自定义source,有两点需要注意。
配置中必须通过class属性指定类名,程序会根据类名进行反射。
配置的自定义Source必须有不含构造参数的构造方法,否则无法实例化。
配置自定义Source
编辑 metrics.properties配置文件
增加如下配置即可
UI中的显示信息
配置项
Spark config
配置项 | 默认值 | 意义 |
---|---|---|
spark.metrics.conf | metrics.properties | Metrics系统的配置文件 |
spark.metrics.conf.xxx | 无 | 将xxx作为属性,添加到属性中,可以将其理解为 metrics.properties中配置的一种转移,会和metrics.properties中的配置项放在一起,而且此配置优先级较高 |
spark.metrics.namespace | 无 | metrics的命名空间,参与构建Metrics source注册名的生成 |
spark.app.id | 无 | 如果spark.metrics.namespace没有指定值,则使用该值作为namespace,参与构建Metrics source注册名的生成 |
spark.executor.id | 无 | executor的id,参数构建Metrics source注册名的生成 |
Metrics config
配置项 | 默认值 | 意义 |
---|---|---|
source.(.+).(.+) | 无 | source的配置信息,就是以source.xxx.xxx形式的,只能有两个dot(.) |
sink.(.+).(.+) | 无 | sink的配置信息,就是以sink.xxx.xxx形式的,只能有两个dot(.) |
对于source和sink的配置,参考:$SPARK_HOME/conf/metrics.properties.template