Baimoon's Note


  • 首页

  • 分类

  • 归档

  • 标签

Spark Trouble Shooter

发表于 2020-07-16   |   分类于 bigdata

_temporary目录导致加载动态分区报错

在执行Spark任务时发生如下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: get partition: Value for key partition_app_type is null or empty;
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:104)
at org.apache.spark.sql.hive.HiveExternalCatalog.loadDynamicPartitions(HiveExternalCatalog.scala:888)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:385)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:68)
at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:164)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:183)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:68)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:898)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:691)
at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:64)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:340)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:379)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:311)
at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:464)
at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:480)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:174)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:779)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: get partition: Value for key partition_app_type is null or empty
at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1693)
at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1664)
at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1276)
at org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:1515)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitions(HiveShim.scala:912)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveClientImpl.scala:793)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:791)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:791)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:320)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:258)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:257)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:303)
at org.apache.spark.sql.hive.client.HiveClientImpl.loadDynamicPartitions(HiveClientImpl.scala:791)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveExternalCatalog.scala:900)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:888)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:888)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:95)
... 27 more
Error in query: org.apache.hadoop.hive.ql.metadata.HiveException: get partition: Value for key partition_app_type is null or empty;

当Spark的Job提交后,在调用loadDynamicPartitions方法时抛出以上错误。

在Hive中增加日志,检查加载动态分区时所加载的目录信息:

1
2
3
4
5
6
7
20/07/13 20:59:34,560 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/_temporary
20/07/13 20:59:34,561 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=1
20/07/13 20:59:34,561 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=2
20/07/13 20:59:34,562 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=3
20/07/13 20:59:34,562 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=4
20/07/13 20:59:34,563 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=5
20/07/13 20:59:34,564 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=6

Hive的getPartition方法中增加日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
20/07/13 20:59:35,413 WARN Hive: rwm Test ---- fullPartSpec info:
20/07/13 20:59:35,413 WARN Hive: rwm Test ---- key=partition_date, value=20200706
20/07/13 20:59:35,413 WARN Hive: rwm Test ---- key=partition_app_type, value=4
20/07/13 20:59:35,413 WARN Hive: rwm Test : getPartition: name='partition_date' value='20200706'
20/07/13 20:59:35,413 WARN Hive: rwm Test : getPartition: name='partition_app_type' value='4'
20/07/13 20:59:35,569 WARN Hive: rwm Test : getPartition: name='partition_date' value='20200706'
20/07/13 20:59:35,569 WARN Hive: rwm Test : getPartition: name='partition_app_type' value='4'
20/07/13 20:59:35,789 WARN Hive: rwm Test ----
20/07/13 20:59:35,789 WARN Hive: rwm Test ---- fullPartSpec info:
20/07/13 20:59:35,789 WARN Hive: rwm Test ---- key=partition_date, value=20200706
20/07/13 20:59:35,789 WARN Hive: rwm Test ---- key=partition_app_type, value=
20/07/13 20:59:35,789 WARN Hive: rwm Test : getPartition: name='partition_date' value='20200706'
20/07/13 20:59:35,789 WARN Hive: rwm Test : getPartition: name='partition_app_type' value=''

发现在解析_temporary目录时,就会抛出之前的异常信息了。

_temporary目录时Spark Write Job写数据时的临时目录,也是用来保证数据提交一致性的一个目录。该目录是在newTaskTempFile方法中,调用FileOutputCommitter的workPath得到的。理论上来说,这个目录在Job提交后(调用commitJob)之后,是会进行清理的,但是因为推测执行的原因,commitJob之后,可能因为某些推测执行的task仍然在运行,这个目录没有被删除掉。

目前的解决方案是在调用loadDynamicPartition方法之前,先判断_temporary是否存在,如果存在,则删除一次。因为这个时候已经commitJob了,所以认为_temporary已经没有用了。增加此逻辑后,对比任务的执行结果,条数一致,认为可行。

httpClient

发表于 2020-07-16

Spark Catalog

发表于 2020-07-13

Spark Catalog
Spark SQL提供了执行sql语句功能,sql语句是以表的方式组织、使用数据,那么表本身是如何组织存储的呢?肯定会有元数据之类的东西存在,Catalog就是Spark2.0之后提供的访问元数据的类。

1
2

Catalog提供了一些API,用来对数据库、表、视图、缓存、列、函数进行操作。

Catalog相关的代码位于spark-sql包的org.apache.spark.sql.catalog目录下,它定义的对象有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Database(
val name: String,
@Nullable val description: String,
val locationUri: String) extends DefinedByConstructorParams
class Table(
val name: String,
@Nullable val database: String,
@Nullable val description: String,
val tableType: String,
val isTemporary: Boolean) extends DefinedByConstructorParams
class Column(
val name: String,
@Nullable val description: String,
val dataType: String,
val nullable: Boolean,
val isPartition: Boolean,
val isBucket: Boolean) extends DefinedByConstructorParams
class Function(
val name: String,
@Nullable val database: String,
@Nullable val description: String,
val className: String,
val isTemporary: Boolean) extends DefinedByConstructorParams

而对于Catalog本身,它定义了接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def currentDatabase: String
def setCurrentDatabase(dbName: String): Unit
def listDatabases(): Dataset[Database]
def listTables(): Dataset[Table]
def listTables(dbName: String): Dataset[Table]
def listFunctions(): Dataset[Function]
def listFunctions(dbName: String): Dataset[Function]
def listColumns(tableName: String): Dataset[Column]
def listColumns(dbName: String, tableName: String): Dataset[Column]
def getDatabase(dbName: String): Database
def getTable(tableName: String): Table
def getTable(dbName: String, tableName: String): Table
def getFunction(functionName: String): Function
def getFunction(dbName: String, functionName: String): Function
def databaseExists(dbName: String): Boolean
def tableExists(tableName: String): Boolean
def tableExists(dbName: String, tableName: String): Boolean
def functionExists(functionName: String): Boolean
def functionExists(dbName: String, functionName: String): Boolean
def createTable(tableName: String, path: String, source: String): DataFrame
def createTable(tableName: String, source: String, options: Map[String, String]): DataFrame
def createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
def dropTempView(viewName: String): Boolean
def dropGlobalTempView(viewName: String): Boolean
def recoverPartitions(tableName: String): Unit
def isCached(tableName: String): Boolean
def cacheTable(tableName: String): Unit
def uncacheTable(tableName: String): Unit
def clearCache(): Unit
def refreshTable(tableName: String): Unit
def refreshByPath(path: String): Unit

在Spark中,Catalog只有一个实现类:CatalogImpl,而Catalog的实现依赖的是SessionCatalog(通过sparkSession.sessionState.catalog)。在SessionCatalog中定义了一些功能:

1
2
3
4
5
# 数据库是否存在
def databaseExists(db: String): Boolean = {
val dbName = formatDatabaseName(db)
externalCatalog.databaseExists(dbName)
}

从上面方法的定义可以看出,SessionCatalog又依赖externalCatalog来实现的。SessionCatalog中的externalCatalog是在创建SessionCatalog的时候,通过参数传入的。对于ExternalCatalog,我们之后再讨论,这里只是先关注 CatalogImpl和SessionCatalog。

除了SessionCatalog,在CatalogImp中还会用到 sparkSession.sessinState.sqlParser,用来解析Table、Function等对象的id,如:

1
2
sparkSession.sessionState.sqlParser.parseTableIdentifier
sparkSession.sessionState.sqlParser.parseFunctionIdentifier

creatTable
CatalogImpl中对 createTable的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
override def createTable(
tableName: String,
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val storage = DataSource.buildStorageFormatFromOptions(options)
val tableType = if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = tableType,
storage = storage,
schema = schema,
provider = Some(source)
)
val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
sparkSession.sessionState.executePlan(plan).toRdd
sparkSession.table(tableIdent)
}

python_study

发表于 2020-07-09   |   分类于 python

记录Python常用的组件

redis

redis模块用于Python连接redis数据库。redis提供了两个类Redis和StrictRedis用于实现Redis的命令。

安装redis

1
pip install redis

基本使用

1
2
3
4
# 连接服务
import redis
client = redis.Redis(host='localhost', port=6739, decode_responses=True)

decode_responses=True,表示写入的键值对中的value为字符串类型,为False(默认值)表示写入字节类型。
除了直连方式,redis还提供了连接池功能,这样可以更优的使用连接:

1
2
3
import redis
clientPool = redis.ConnectionPool(host='host', port=6397, decode_responses=True)

set命令

set命令的语法为:set(name, value, ex=None, px=None nx=False, xx=False)

ex:过期时间-秒
nx:过期时间-毫秒
px:设置True时,只有name不存在时才执行
xx:设置True时,只有name存在时才执行

其他命令 >> https://www.jianshu.com/p/2639549bedc8

mysql

MySQLdb是用于Pyton连接Mysql数据库的接口,它实现了Python数据库API规范V2.0,基于MySQL C API上建立的。

安装MySQLdb

在通过如下脚本引入 MySQLdb 模块时

1
2
3
4
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import MySQLdb

如果发生如下报错:

1
2
3
4
Traceback (most recent call last):
File "test.py", line 3, in <module>
import MySQLdb
ImportError: No module named MySQLdb

则说明,当前python环境中没有安装 MySQLdb模块。

安装MySQLdb可以参考“https://pypi.python.org/pypi/MySQL-python”,基本上执行

1
pip install MySQL-python

命令就可以了。

基本使用

MySQLdb的使用指南:https://mysqlclient.readthedocs.io/user_guide.html#mysqldb
使用这个模块,首先就是导入:

1
2
3
4
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import MySQLdb

然后调用MySQLdb的connect方法与server建立连接:

1
2
3
4
5
# 创建连接
conn = MySQLdb.connect("host", "userName", "password", "DB-name", port, charset='utf8' )
# 为了防止连接泄漏,不用的时候需要关闭
conn.close()

建立连接后,就可以用来创建游标,并使用游标来进行各种操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 调用cursor方法获取游标
cursor = conn.cursor()
# 调用游标执行sql
cursor.execute("sql...")
# 对于查询sql的执行,通过游标获取数据
row = cursor.fetchone()
rows = cursor.fetchall()
c = cursor.rowcount()
# 对于执行写入的sql,需要连接提交
conn.commit()
# 如果写入失败,需要连接回滚
conn.rollback()

使用用例

Zookeeper

Python中用于操作Zookeeper的模块是 kazoo。

安装kazoo

基本使用

要想使用kazoo,需要先进行导入

1
from kazoo.client import KazooClient,KazooState

然后便可以创建Zookeeper客户端

1
2
3
4
# 创建客户端
zk = KazooClient(host='127.0.12.1:2181,127.0.0.1:2181', timeout=10, logger=logging)
# 启动后开始和server心跳
zk.start()

timeout 是连接超时时间
logger 是用来进行日志输出的日志对象

1
2
3
4
5
6
7
8
9
10
11
12
# 获取节点数据和状态
data, stat = zk.get('/myZnode')
# 获取子节点
children = zk.get_children("/")
# 监听节点变化
def zkEvent(event):
print type(event)
print event
zk.exists("/zNode", zkEvent)

其他方法可以参考kazoo的文档

使用用例

logging

loggine模块用来处理日志信息。

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
需要使用logging模块,首先要引入
import loggine
# 调用 logging的方法来记录不同级别的日志
logging.warning("here is warning")
logging.info("here is info")
# 如果不设置,日志的数据级别默认为waring,如下设置日志的输出级别,以及日志格式
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:%(name)s:%s(levelname)s:%(message)s')
# 没有创建logger,默认是root logger,我们可以创建自己的logger
logger = logging.getLogger("kafka")
logger.setLevel(logging.DEBUG)
# 日志信息是否向上传播
logger.propagate = False
# 添加Handler,用于处理日志的输出位置
kafka_log_handler = logging.FileHandler(filename="myKafka.log")
# 定义日志格式
kafka_formatter = logging.Formatter('%(asctime)s:%(name)s:%s(levelname)s:%(message)s')
# 将formatter赋值给handler,将handler赋值给 logger
kafka_log_handler.setFormatter(kafka_formatter)
logger.addHandler(kafka_log_handler)
# 定义一个继承前面kafka的logger
logger_c = logging.getLogger("kafka.child")
# 这样logger_c的日志也会输出到 kafka的 logger中。

上面使用过实例化logging.logger,然后给logger添加所需的设置,除此之外,我们还可以通过

1
logging.config.dictConfig来从配置文件生成logger

配置文件定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
PATTERN = {
'version': 1,
'formatters': {
'normal': {
'format': '%(name)s %(asctime)s %(levelname)s %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'
},
'raw': {
'format': '%(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout',
'formatter': 'normal',
},
'root': {
'class': 'logging.handlers.WatchedFileHandler',
'formatter': 'normal',
'filename': RUNTIME_HOME + '/var/log/root.log',
'mode': 'a',
'level': 'INFO',
},
'extapi': {
'class': 'logging.handlers.WatchedFileHandler',
'formatter': 'normal',
'filename': RUNTIME_HOME + '/var/log/ext_api.log',
'mode': 'a',
'level': 'DEBUG',
},
'api': {
'class': 'logging.handlers.WatchedFileHandler',
'formatter': 'normal',
'filename': RUNTIME_HOME + '/var/log/api.log',
'mode': 'a',
'level': 'DEBUG',
},
},
'loggers': {
'API': {'level': 'DEBUG',
'handlers': ['api'],
},
'EXTAPI': {'level': 'DEBUG',
'handlers': ['extapi'],
},
'requests.packages.urllib3.connectionpool': {'level': 'ERROR'},
},
'root': {
'handlers': ['root',],
'level': 'INFO',
}
}

然后使用配置文件初始化logging

1
2
3
4
logging.config.dictConfig(log_config.PATTERN)
# 然后通过getLogger方法获取logger
api_logger = logging.getLogger("API")

Formatter的设置

%(levelno)s:打印日志的级别数值信息。
%(levelname)s:打印日志的级别名称。
%(pathname)s:打印当前执行程序的路径信息。
%(filenae)s:打印当前执行程序的名字。
%(funcName)s:打印当前的函数名。
%(lineno)d:打印日志的当前行号。
%(asctime)s:打印日志的时间。
%(thread)d:打印线程ID。
%(threadName)s:打印线程名称。
%(process)d:打印进程ID。
%(message)s:打印日志信息。

日志的滚动和日志保留个数

使用TimedRotatingFileHandler可以设置日志的滚动方式以及保留的日志文件个数。

1
2
log_file_handler = TimedRotatingFileHandle(filename='roll_log', when='M', interval=2, backupCount=1)
# filename是日志文件的名字, when是日志滚动的时间单位,interval是日志滚动的间隔,backupCount是日志文件保留的个数。

when的取值

S:秒
M:分钟
H:小时
D:天
W:周
midnight:午夜。
如果选择D,不是以日期天来滚动的,是以启动日志时,24小时为一天算。

参考文档:https://docs.python.org/2/library/logging.config.html#logging-config-dictschema

spark

发表于 2020-07-06   |   分类于 bigData

记录spark的一些参数的使用

Split的相关参数

hive.exec.orc.split.strategy 指定Split的策略
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 默认值为Integer.MAX_VALUE
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 默认值为1

Spark做Orc文件的Split基本上还是使用Hive来实现的。默认的策略为HYBRID,可以修改为BI或ETL。具体解释如下(拷贝过来的):hive.exec.orc.split.strategy参数控制在读取ORC表时生成split的策略。BI策略以文件为粒度进行split划分;ETL策略会将文件进行切分,多个stripe组成一个split;HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256 1024 1024)时使用ETL策略,否则使用BI策略。 — 对于一些较大的ORC表,可能其footer较大,ETL策略可能会导致其从hdfs拉取大量的数据来切分split,甚至会导致driver端OOM,因此这类表的读取建议使用BI策略。对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。— 另外,spark.hadoop.mapreduce.input.fileinputformat.split.minsize参数可以控制在ORC切分时stripe的合并处理。具体逻辑是,当几个stripe的大小小于spark.hadoop.mapreduce.input.fileinputformat.split.minsize时,会合并到一个task中处理。可以适当调小该值,以此增大读ORC表的并发。

Shuffle相关

spark.sql.adaptive.shuffle.targetPostShuffleInputSize

什么时候用这个参数?在界面上看某个stage中tasks列表时,如果tasks的“shuffle read size / record”列特别不平均,则可以使用这个参数进行调优。
该参数只能用于Spark-sql任务。

tmux

发表于 2020-07-04   |   分类于 Unix

本文记录tmux的一些常用命令

基本介绍

Tumx就是会话与窗口的解绑工具,将它们彻底分离。

允许在单个窗口中,同时访问多个会话。这对于同时运行多个命令程序很有用。
可以让新窗口“接入”已经存在的会话。
允许每个会话有多个连接窗口,因此可以多人实时共享会话。
支持窗口的任意垂直和水平拆分。

安装

CentOS: yum install tmux
MAC:brew install tmux

基本的启动和退出

tmux命令即可以新建一个会话。
显示的输入exit 或 ctrl+b d就可以断开与tmux会话的连接。
ctrl+b ? 显示帮助信息。

会话管理

新建一个会话

直接使用tmux创建的会话是没有名字,但是会有一个编号,编号根据创建的顺序从0开始累加。如果想要创建一个带有自定义名字的会话,可以使用 tmux new -s session-name。

与会话脱离

从Tmux会话窗口中,可以按下 ctrl+b d 或输入 tmux detach。

列出主机上所处于活跃的tmux会话

使用 tmux ls命令就可以列出机器上当前用户还处于未关闭的tmux会话。

接入存在存在的tmux会话

使用 tmux a -t session-name 或 tmux attach -t session-name可以与名字为session-name的tmux会话进行连接。

关闭tmux会话

tmux kill-session -t session-name 可以将存在但是未连接的tumx会话关闭。如果处于连接状态的tumx,可以直接退出即可。

切换tumx会话

tmux switch -t session-name 就可以切换到指定的tmux会话中。

重命名会话

tmux rename-session -t old-name new-name就可以对tmux会话重新命名。

快捷键

Ctrl+b d: 分离当前tmux会话
Ctrl+b s: 列出所有会话
Ctal+b $: 重命名当前会话

窗格操作

拆分窗格

tmux split-window 上下拆分窗格
tmux split-window -h 左右拆分窗格

窗格间移动光标

tmux select-pane -U 移动到上方窗格。
tmux select-pane -D 移动到下方窗格。
tmux select-pane -L 移动到左边窗格。
tmux select-pane -R 移动到右边窗格。

窗口快捷键

Ctrl+b % :左右划分两个窗格
Ctrl+b “ :上下划分两个窗格
Ctrl+b :光标切换到其他窗格
Ctrl+b ; :切换到上一个窗格
Ctrl+b o :切换到下一个窗格
Ctrl+b { :当前窗格左移
Ctrl+b } :当前窗格右移
Ctrl+b x :关闭当前窗格
Ctrl+b ! :将当前窗格拆分成一个独立的窗口
Ctrl+b z :当前窗格全屏显示,在使用一次变回原来大小
Ctrl+b q :显示窗格编号。
Ctrl+b [ :进入窗格滚动状态,按q退出。Ctrl+b [是进入了copy-modo。
Ctrl+b setw -g moe-mouse on

zookeeper

发表于 2020-06-29   |   分类于 zookeeper

ZooKeeper的基本使用

zookeeper的常用服务端命令

服务启动命令

sh ${ZOOKEEPER_HOME}/bin/zkServer.sh start

服务停止命令

sh ${ZOOKEEPER_HOME}/bin/zkServer.sh stop

服务重启命令

sh ${ZOOKEEPER_HOME}/bin/zkServer.sh restart

服务状态命令

sh ${ZOOKEEPER_HOME}/bin/zkServer.sh status

连接服务命令

sh ${ZOOKEEPER_HOME}/bin/zkCli.sh -server host:port

zookeeper的常用客户端命令

ls 命令

ls命令用于列出指定ZNode的子节点

create 命令

create命令用于创建指定的ZNode。

get 命令

get命令用于获取指定ZNode的值。

set 命令

set命令用于设置指定ZNode的值。

delete命令

delete命令用于删除指定的ZNode,不能删除子ZNode不为空的ZNode。

rmr 递归删除节点

使用rmr命令可以删除节点以及节点的子节点。

quit 命令

quit用于退出当前科幻。

数据状态信息

cZxid:节点被创建时使用的事务id - 不变
ctime: 节点的创建时间 - 不变
mZxid: 节点最后一次被修改使用的事务id - 改变
mtime:节点最后一次被修改的时间 - 改变
pZxid:子节点id。
cversion:子节点version
dataVersion: 当前节点数据版本号,数据修改一次,版本号增加一。
aclVersion:节点ACL的版本,ACL修改一次,版本号增加一。
ephemeralOwner:临时节点所归属的Session。
dataLength:数据长度。
numChildren:子节点的个数。

zookeeper的四字母命令

命令 描述
conf (New in 3.3.0)输出相关服务配置的详细信息。比如端口、zk数据及日志配置路径、最大连接数,session超时时间、serverId等
cons (New in 3.3.0)列出所有连接到这台服务器的客户端连接/会话的详细信息。包括“接受/发送”的包数量、session id 、操作延迟、最后的操作执行等信息。
crst (New in 3.3.0)重置当前这台服务器所有连接/会话的统计信息
dump 列出未经处理的会话和临时节点(只在leader上有效)
envi 输出关于服务器的环境详细信息(不同于conf命令),比如host.name、java.version、java.home、user.dir=/data/zookeeper-3.4.6/bin之类信息
reqs 列出未经处理的请求
ruok 测试服务是否处于正确状态。如果正常,返回“imok”
srst 重置服务器的统计信息
srvr (New in 3.3.0)输出服务器的详细信息。zk版本、接收/发送包数量、连接数、模式(leader/follower)、节点总数。
stat 输出关于性能和连接的客户端的列表
wchs 列出Server的watch的详细信息
wchc 通过Session列出服务器watch的详细信息,他的输出是一个与watch相关的会话的列表。小心使用。
wchp 通过路径列出服务器watch的详细信息。它输出一个与Session相关的路径。小心使用。
mntr (New in 3.4.0)列出集群的健康状态。包括“接受/发送”的包数量、操作延迟、当前服务模式(leader/follower)、节点总数、watch总数、临时节点总数。

如何使用四字命令

1
2
3
4
echo conf | nc host port
如:
echo stat | nc zookeeper.com 2181

比如可以使用stat来查看当前server节点的连接信息,从而找出连接数太多的server。

权限管理

ACL

ACL的特性

ZooKeeper 的权限控制是基于每个 znode 节点的,需要对每个节点设置权限
每个 znode 支持设置多种权限控制方案和多个权限
子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点

可控操作

ACL可以控制的具体操作有:

CREATE :(缩写c)可以创建子节点。
DELETE :(缩写d)可以删除子节点。
READ :(缩写r)可以读取节点数据及显示子节点列表。
WRITE :(缩写w)可以设置节点数据。
ADMIN :(缩写a)可以设置节点访问控制列表权限。

控制模式

ACL以如下模式进行ACL控制:

world :只有一个用户:anyone,代表所有人(默认)。
ip :使用IP地址认证。
auth :使用以添加认证的用户认证。
digest :使用“用户名:密码”认证。
sasl:Kerberos的认证方式

设置ACL的命令为setAcl,此命令后面跟的是znode和需要设置的Acl信息,如:setAcl / ip:127.0.0.1:cdwra。其中的Acl信息包含三部分内容:Acl模式:模式信息:具体的权限。
对于world模式的设置为:world:anyone:cdrwa,其中world:anyone是固定的,后面的cdrwa可以根据自己的需求进行设置。对于ip模式的设置为:ip:127.0.0.1:cdwra,ip指定了ACL的模式,模式信息为ip地址,最后是具体的权限。auth模式的设置为: auth:userName:cdrwa,其中auth指定了ACL的模式,userName指定需要使用的认证的用户名,最后是具体的权限,这里的第二部分的userName是指通过addauth userName:password命令在server上通过鉴权用户名,也就是使用server中已经鉴权过的用户的信息进行设置。digest模式的设置为:digest:userName:password:cdwra,digest指定了ACL的模式,userName:password是通过特殊处理的账号和密码(下面会介绍),最后一部分是具体的权限。

通过 setAcl /path digest:userName:password:acl 可以设置 digest类型的ACL。这里的密码是经过SHA1及BASE64处理过的密文,在Shell中如下命令来获取:

1
echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64

对于auth和digest两种方式设置的ACL,需要通过addauth添加认证信息后,才可以操作。

相关命令

与ACL相关的命令包括如下:

getAcl:获取指定ZNode的ACL信息。
setAcl:设置制定ZNode的ACL信息。
addauth: 为当前连接增加认证信息。

Kerberos

ZooKeeper中使用Kerberos可以用来控制三个点:1、Server之间的连接;2、Client与Server之间的连接;3、ZNode的ACL。如果要在ZooKeeper中启用Kerberos,需要进行如下配置:
首先需要在zoo.cfg中增加如下配置:

1
2
3
4
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true

在ZooKeeper的配置目录下(/opt/zookeeper/conf)添加环境文件“java.env”,其内容为:

1
export JVMFLAGS="-Djava.security.auth.login.config=/home/zookeeper/zookeeper-3.5.1-alpha/conf/jaas.conf" // 此处指定jaas.conf文件

在ZooKeeper的配置目录下(/opt/zookeeper/conf)添加配置文件“jaas.conf”,其内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/zookeeper/zookeeper-3.5.1-alpha/conf/zookeeper.keytab" // 指定keyTab文件
storeKey=true
useTicketCache=false
principal="zookeeper/zdh-237@ZDH.COM"; // 指定principal文件
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/zookeeper/zookeeper-3.5.1-alpha/conf/zookeeper.keytab" // 指定keyTab文件
storeKey=true
useTicketCache=false
principal="zookeeper/zdh-237@ZDH.COM"; // 指定principal文件
};

这里包含了server的和client的,如果不需要client,可以只保留server 的。
另外,开启了Kerberos的ZooKeeper集群是可以通过非Kerberos方式连接的,server会自动降级。

超级管理员角色

默认的ZK Server在启动的时候,是没有超级管理员的权限的,要增加超级管理员的权限,我们可以在zkServer.sh

1
2
3
4
5
6
7
8
9
10
11
12
case $1 in
start)
echo -n "Starting zookeeper ... "
if [ -f "$ZOOPIDFILE" ]; then
if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
echo $command already running as process `cat "$ZOOPIDFILE"`.
exit 0
fi
fi
nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
#TODO
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &

中的“#TODO”位置增加

1
"-Dzookeeper.DigestAuthenticationProvider.superDigest=super:crJlNpw5TWXtn9f8LCsep65WWyg=" \

这样一段配置,那么在连接到这个节点后,使用addauth digest super:password进行鉴权,然后就有超级管理员的权限啦。
超级管理员可以使用的场景设置ACL的账号密码忘记了,或者想要操作由Kerberos管理的的ZNode。

Python 操作zookeeper

首先需要安装 kazoo

1
pip install kazoo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/python
import sys
from kazoo.client import KazooClient,KazooState
zk = KazooClient(
hosts='hadoop2035.rz.momo.com:2182'
,timeout=10.0
)
zk.start()
path = "/databridgemanager_mirror/dataDir/finished"
children = zk.get_children(path);
zk.stop()
zk.close()

问题排查

Session Timeout

在ZooKeeper的使用过程中,经常遇到的一个问题是 Session Timeout,在我们的使用场景中,Kafka的某些节点总是遇到Session Timeout从而导致leader切换。

ZooKeeper的监控

kerberos

发表于 2019-12-05

在Spark启用Kerberos之后,遇到了这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
19/12/05 11:57:49 ERROR TSaslTransport: SASL negotiation failure
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled)]
at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:199)
at org.apache.thrift.transport.TSaslTransport$SaslParticipant.evaluateChallengeOrResponse(TSaslTransport.java:539)
at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:283)
at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
at org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge$Server$TUGIAssumingTransportFactory$1.run(HadoopThriftAuthBridge.java:739)
at org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge$Server$TUGIAssumingTransportFactory$1.run(HadoopThriftAuthBridge.java:736)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
at org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge$Server$TUGIAssumingTransportFactory.getTransport(HadoopThriftAuthBridge.java:736)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:269)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled)
at sun.security.jgss.krb5.Krb5Context.acceptSecContext(Krb5Context.java:856)
at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:342)
at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:285)
at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:167)
... 14 more
Caused by: KrbException: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled
at sun.security.krb5.EncryptionKey.findKey(EncryptionKey.java:522)
at sun.security.krb5.KrbApReq.authenticate(KrbApReq.java:273)
at sun.security.krb5.KrbApReq.<init>(KrbApReq.java:149)
at sun.security.jgss.krb5.InitSecContextToken.<init>(InitSecContextToken.java:108)
at sun.security.jgss.krb5.Krb5Context.acceptSecContext(Krb5Context.java:829)
... 17 more

一脸懵逼,在网上查各种资料,找到如下解决方法:

1
locate 'jre/lib/security' | grep 'lib/security$'

会列出如下信息

1
2
3
4
/opt/jdk1.7.0_71/jre/lib/security
/opt/jdk1.8.0_121/jre/lib/security
/usr/lib/jvm/java-1.5.0-gcj-1.5.0.0/jre/lib/security
/usr/lib/jvm/java-1.8.0-oracle-1.8.0.102.x86_64/jre/lib/security

根据自己使用的JDK版本,去到对应的目录,我使用的是1.8,因此去/opt/jdk1.8.0_121/jre/lib/security目录,如果不存在上面的目录,比较麻烦喽:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ cd /opt/jdk1.8.0_121/jre/lib/security
$ ll
-rw-r--r-- 1 user user 3890 Jul 6 2017 blacklist
-rw-r--r-- 1 user user 93987 Aug 28 2017 cacerts
-rw-r--r-- 1 user user 490 Jul 6 2017 cacerts.20160121
-rw-r--r-- 1 user user 93987 Aug 28 2017 cacerts-bak
-rw-r--r-- 1 user user 490 Jul 6 2017 cacerts.bak
-rw-r--r-- 1 user user 158 Jul 6 2017 javafx.policy
-rw-r--r-- 1 user user 2593 Jul 6 2017 java.policy
-rw-r--r-- 1 user user 17838 Jul 6 2017 java.security
-rw-r--r-- 1 user user 98 Jul 6 2017 javaws.policy
-rw-r--r-- 1 user user 2500 Jul 6 2017 local_policy.jar
-rw-r--r-- 1 user user 0 Jul 6 2017 trusted.libraries
-rw-r--r-- 1 user user 2487 Jul 6 2017 US_export_policy.jar

注意其中的 local_policy.jar 和 US_export_policy.jar,需要使用正确的jar。就可以解决啦。

对于没有上面目录的情况(没有试过,仅作为一种解决方法记录):
则需要下载JCE.jar文件到目录:/opt/jdk1.8.0_121/jre/lib/security。
JCE.zip文件包括如下内容(1.7版本)

1
2
3
4
5
$ ls -l UnlimitedJCEPolicy
total 16
-rw-rw-r-- 1 root root 2500 May 31 2011 local_policy.jar
-rw-r--r-- 1 root root 7289 May 31 2011 README.txt
-rw-rw-r-- 1 root root 2487 May 31 2011 US_export_policy.jar

然后就正常添加就可以了。

Hive Common Operation

发表于 2019-10-31   |   分类于 Big Data

本文记录hive常用的一些命令和操作,以及使用过程的注意点。

使用beeline进行连接:

!connect jdbc:hive2://hadoopxxx.com:10000 user password

查看Hive中表的信息
desc formatted online.mytabl_name partition(partition_date=’20191002’);

创建带有分区的表
create table online.mytable_name (id string) partitioned by (partition_date string);

查询并将结果写入到表
insert into table online.mytable_name partition(partition_date=’20191002’) select momo_id from online.mytable_name_from where partition_date=’20191002’;

Spark-launch-Executor

发表于 2019-09-02

启动开始

在Yarn调度模式中,Executor的启动是从YarnAllocator的runAllocatedContainers方法开始的。
以下是这个方法的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Launches executors in the allocated containers.
*/
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
for (container <- containersToUse) {
... 此处是记录各种状态
if (numExecutorsRunning.get < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
updateInternalState()
} catch {
... 异常的处理
}
}
})
} else {
// For test only
updateInternalState()
}
} else {
logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
"reached target Executors count: %d.").format(
numExecutorsRunning.get, targetNumExecutors))
}
}
}

此处的执行逻辑是,循环要使用的container信息,如果当前运行的Executor个数小于想要启动(目标)的Executor个数(启动中的不算),那么就让launcherPool启动一个ExecutorRunnable。

ExecutorRunnable的定义

这里需要看一下ExecutorRunnable的定义:

1
2
3
4
5
6
7
8
9
10
11
12
private[yarn] class ExecutorRunnable(
container: Option[Container],
conf: YarnConfiguration,
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]) extends Logging

在ExecutorRunnable的定义中,它需要如下参数:Container、YarnConfiguration、SparkConf、Master地址、executor的id、hostname、executor内存大小、executor的cpu核数、application的id、SecurityManager和本地资源信息。

Runnable的参数确定

这里先看一下YarnAllocator传递的哪些参数

Container

container 来自containersToUse集合中,containerToUse是如下得到的:

1
2
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()

也就是说是通过AMRMClient调用AM的服务后分配的。

YarnConfiguration

YarnConfiguration是来自于YarnAllocator的构造参数:

其中第三个参数conf就是YarnConfiguration,这个conf是来自YarnRMClient的register方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator = {
... // 省略其他操作
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
}

而register方法是在ApplicationMaster的registerAM方法中调用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private def registerAM(
_sparkConf: SparkConf,
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: Option[String],
securityMgr: SecurityManager) = {
allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
securityMgr,
localResources)
}

yarnConf是在ApplicationMaster中定义的:

1
2
private val sparkConf = new SparkConf()
private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)

1
2
3
4
5
def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new Configuration()
appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
hadoopConf
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {
// Note: this null check is around more than just access to the "conf" object to maintain
// the behavior of the old implementation of this code, for backwards compatibility.
if (conf != null) {
// Explicitly check for S3 environment variables
val keyId = System.getenv("AWS_ACCESS_KEY_ID")
val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
if (keyId != null && accessKey != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", keyId)
hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)
hadoopConf.set("fs.s3a.access.key", keyId)
hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)
hadoopConf.set("fs.s3a.secret.key", accessKey)
val sessionToken = System.getenv("AWS_SESSION_TOKEN")
if (sessionToken != null) {
hadoopConf.set("fs.s3a.session.token", sessionToken)
}
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}
}

所以,从代码可以看出,yarnConf其实是一个“org.apache.hadoop.conf.Configuration”,它包含三部分内容:1、上面的S3的属性。2、Spark配置中以“spark.hadoop.”开头的配置。3、特定的“io.file.buffer.size”属性。

SparkConf

ExecutorRunnable需要的第三个参数是SparkConf。这个参数也是来自YarnAllocator的构造方法,和YarnConfiguration一样是从YarnRMClient的register方法的参数那里得到的。继续向上找,这个参数是来自ApplicationMaster的registerAM方法,对于registerAM方法,有两个地方调用,而这两个方法传递的SparkConf也是不一样的:driver启动和Executor启动。对于driver启动和Executor的启动,区别是是否是集群模式(isClusterModel),这两个方法只会执行一种:如果是集群模式(isClusterModel==true),则driver启动,否则Executor启动。

driver启动

如果是集群模式(isClusterModel == true),就会调用runDriver方法,在runDriver方法中会调用registerAM方法。这里的SparkConf是从SparkContext中得到的:

1
2
3
4
val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS))
...
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
}

对于driver的方式的SparkContext的获取需要之后详细了解。

Executor启动

如果是非集群模式(isClusterModel==false),就会调用runExecutorLauncher方法,在这个方法中会调用registerAM。这里传递的SparkConf就比较直观了,就是在ApplicationMaster类中实例化的SparkConf:

1
private val sparkConf = new SparkConf()

masterAddress

ExecutorRunnable的第四个参数是masterAddress,一个字符串类型参数。但是在yarnAllocator的runAllocatedContainers方法中生成ExecutorRunnable时传递的是“driverUrl”,这个“driverUrl”是来自YarnAllocator的构造方法的“driverUrl”,是来自ApplicationMaster的registerAM方法中生成的:

1
val driverUrl = RpcEndpointAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

这里的_sparkConf是registerAM方法的参数,和上面的(第三个参数)SparkConf来源一致。

executorId

ExecutorRunnable的第五个参数是executorId,一个字符串类型的参数。executorId虽然是字符串类型,其实就是一个数字:

1
2
executorIdCounter += 1
val executorId = executorIdCounter.toString

而executorIdCounter是从driver那里拿到的:

1
private var executorIdCounter: Int = driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)

hostname

ExecutorRunnable的第六个参数是hostname,一个字符串类型的参数。实际上它代表的是executor的hostname,在YarnAllocator的runAllocatedContainers方法中生成:

1
val executorHostname = container.getNodeId.getHost

executorMemory

ExecutorRunnable的第七个参数是executorMemory,一个整型类型的参数。指定了Executor可以使用的内存大小(什么的内存???)。该值是通过参数进行配置的:

1
2
3
4
5
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

从spark.executor.memory中读取,默认值为1GB。

executorCores

ExecutorRunnable的第八个参数是executorCores,一个整型参数。指定了Executor可以使用的cpu核数。该值是通过参数进行配置的:

1
2
3
4
5
protected val executorCores = sparkConf.get(EXECUTOR_CORES)
private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
.intConf
.createWithDefault(1)

从spark.executor.cores中读取,默认为1。该值会影响Executor中tasks的并发数。

appId

ExecutorRunnable的第九个参数是appId,一个字符串类型参数。它是通过YarnAllocator的构造参数ApplicationAttemptId对象得到的:

1
appAttemptId.getApplicationId.toString

ApplicationAttemptId对象是在YarnRMClient中getAttemptId()方法得到的:

1
2
3
def getAttemptId(): ApplicationAttemptId = {
YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId()
}

SecurityManager

ExecutorRunnable的第十个参数是SecurityManager对象。它也是通过一系列的方法传递过来的,最开始是在runDriver和runExecutorLauncher,但是这里和SparkConf不一样,这两个方法使用的SecurityManager是一样的。

1
2
3
4
5
if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}

这两个方法引用的是同一个SecurityManager:

1
val securityMgr = new SecurityManager(sparkConf)

localResources

ExecutorRunnable的最后一个参数是localResources,是一个Map[String, LocalResource]类型集合。localResources在ApplicatinMaster类中进行定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private val localResources = {
val resources = HashMap[String, LocalResource]()
def setupDistributedCache(
file: String,
rtype: LocalResourceType,
timestamp: String,
size: String,
vis: String): Unit = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(rtype)
amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
amJarRsrc.setTimestamp(timestamp.toLong)
amJarRsrc.setSize(size.toLong)
val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
resources(fileName) = amJarRsrc
}
val distFiles = sparkConf.get(CACHED_FILES)
val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
val resTypes = sparkConf.get(CACHED_FILES_TYPES)
for (i <- 0 to distFiles.size - 1) {
val resType = LocalResourceType.valueOf(resTypes(i))
setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,
visibilities(i))
}
sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path =>
val uri = new URI(path)
val fs = FileSystem.get(uri, yarnConf)
val status = fs.getFileStatus(new Path(uri))
// SPARK-16080: Make sure to use the correct name for the destination when distributing the
// conf archive to executors.
val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(),
Client.LOCALIZED_CONF_DIR)
setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE,
status.getModificationTime().toString, status.getLen.toString,
LocalResourceVisibility.PRIVATE.name())
}
CACHE_CONFIGS.foreach { e =>
sparkConf.remove(e)
sys.props.remove(e.key)
}
resources.toMap
}

localResources加载和缓存的数据我们之后在介绍,这里只是梳理各个参数的定义。

启动

生成ExecutorRunnable,然后调用run方法,就可以启动一个Container(Executor)了。runAllocatedContainers方法中也是调用ExecutorRunnable的run方法来启动ExecutorRunnable的。ExecutorRunnable的run方法如下定义:

1
2
3
4
5
6
7
def run(): Unit = {
logDebug("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
startContainer()
}

在这个方法中,先是创建一个NMClient,用YarnConfiguration进行初始化,并启动。接着调用 startContainer方法启动Container。

Executor

发表于 2019-09-02

yarn-learn

发表于 2019-05-05   |   分类于 yarn

本文用来记录自己读书的一些笔记

YARN 组件的功能概述

YARN集群主要分为三个部分:ResourceManager、NodeManager和ApplicationMaster。其中ResourceManager的重要组成部分为调度器和ApplicationMaster。

ResourceManager作为独立的进程运行在专有的机器上,负责集群中所有应用程序的资源分配。它可以为用户提供公平的、基于容量的、本地化的资源调度。在Yarn资源的分配单位为Container,它是一组内存和cpu核数的组合(目前只有内存和cpu)。RsoueceManager和运行在每个节点上的NodeManager会进行交互以便执行和跟踪资源的分配。基于可扩展性需求,ResourceManager和NodeManager之间通过心跳进行通信。NodeManager负责本地可用资源的监控,故障报告以及Container生命周期的管理(启动或终止任务)。
用户将Application提交给ResourceManager,被ResourceManager接受的Applcation会被传递给Scheduler并允许其运行。一旦Scheduler有足够的资源可以满足需求,Application的状态就会从Accepted转为Running。ResourceManager会为ApplicationMaster分配一个Container,ApplicationMaster通常被称为“Container0”.
ApplicationMaster是每个用户作业的主进程,负责管理作业的生命周期,包括动态的增加或减少Container,管理执行流程,处理故障和计算偏差,以及执行其他的本地优化。
ApplicationMaster可以运行以任何编程语言实现的用户程序。通常,ApplicationMaster需要利用多台服务器的处理能力来完成一个作业,因此ApplicationMaster会向ResourceManager进行资源请求。这些资源会包含本地化优势和Container的容量(内存和cpu)。ResourceManager根据可用资源和调度策略来为每个Application分配资源。当一个Container被分配给一个ApplicationMaster时,ResourceManager为该资源生成一个租约,ApplicationMaster通过心跳会得到该租约。基于令牌的机制,保证了ApplicationMaster在NodeManager上使用Container的可靠性。Container在运行过程中,会通过特定协议与ApplicationMaster通信,来报告状态和健康信息,以及接受框架的特定指令(杀掉任务等)。通过这种方式,Yarn提供了对Container的监控和生命周期管理的基础框架,而应用程序特定语义由每个框架独立管理。

ResourceManager

ResourceManager是集群所有资源的仲裁者。它的主要职责就是调度,即在竞争的应用程序之间分配系统中的可用资源,但是它不关心每个应用程序的状态。调度器只处理应用程序的整体资源分配,不关心局部优化和内部流程。ResouceManager不负责的职能,它没有跟踪应用程序的执行流程,没有任务容错能力。

Yarn调度器

Yarn有一个可插拔的调度器组件,根据不同的使用场景和用户需求,管理员可以选择不同的调度策略,目前支持FIFO、Capacity和Fair。具体使用哪种调度器可以在yarn-default.xml中设置。

FIFO

先进先出调度器,它不考虑作业的优先级和范围。FIFO比较适合低负载集群,当使用大型共享集群时,它的功能不佳。

Capacity

Capacity调度器允许多个组安全的共享一个大规模Hadoop集群。要使用Capacity调度器,管理员使用总槽位容量的预定值配置一个或多个队列。这种分配保证了每个队列的最小使用量。管理员为每个队列的可用资源容量配置软限制和可选的硬限制。每个队列有严格的ACL,用来控制那些用户可以向那些队列提交作业。同时也有措施来保证无法查看或修改其他用户的应用程序。
Capacity调度器允许共享集群,同时给每个用户或组一定的最小容量保证。这些最小值在不需要时可以放弃,超出的容量将会给予那些最饥饿的队列,饥饿程度用运行中或已用的队列容量来衡量。
队列的定义和属性可以由管理员以安全的方式,在运行期间修改,以尽量减少对用户的干扰。管理员可以在运行期间添加额外的队列。管理不能在运行期间删除已有队列,但是可以在运行期间停止队列,以确保现有application运行完毕后不会再有新的应用程序被提交。
当工作负载可预见的情况下,Capacity调度器效果最好,有助于分配最小容量。在每个队列内部,使用层次化的FIFO来调度多个Application,类似于在独立的FIFO调度器中使用的方式。

Fair调度器

Fair调度器是将资源公平分配给应用的方法,使得所有应用在平均情况下随着时间得到相等的份额。
在Fair调度模型中,每个Application都属于某一个队列。Yarn Container的分配是选择使用了最少资源的队列,在这个队列中,再选择使用最少资源的应用程序。默认情况下,所有的用户共享一个名为“default”的队列。应用程序可以在提交时指定想要添加的队列。另外,也可以将Fair调度器配置成根据请求中包含的用户名来分配队列。Fair调度器还支持许多功能,如队列的权重、最小份额、最大份额以及队列内FIFO策略,但基本原则就是尽可能平均共享资源。
Fair调度器也支持抢占的概念,从而可以从ApplicationMaster那里要回Container,并且根据配置和应用程序的设计,抢占和随后的资源分配可以是友好的或者强制的。
除了提供平均共享,Fair调度器还允许保证队列的最小份额,确保某些用户、组或者生产应用程序总能够得到足够的资源。当队列中有等待的Application时,它至少能够得到最小份额的资源。Fair调度器可以通过配置文件限制每个用户和每个对了中运行Application的数量。
Fair调度器允许Container请求一定量的内存资源。为了避免多个较小内存Application饿死一个较大内存应用,引入了“reserved Container”,如果由于内存不足,一个Applicaiton不能立即使用一个Container,可以将其保留给其他应用程序,这样,其他应用程序不能使用这个Container,直到它被释放。被保留的Container会等待其他本地Container被释放,然后使用这些额外的容量来完成这个作业。一个保留的Container只允许在一个节点上,并且一个节点只允许一个保留的Container。
Fair调度器也支持层次化队列,队列可以嵌套在其他队列中,每个队列将它的资源再以一种公平的调度方式分配给它的子队列。

Container

Container是单个节点上一组资源(内存、CPU等)的集合。单个NodeManager上可以有多个Container,Container由NodeManager监控,由ResourceManager调度。
每个Application都是从ApplicationMaster开始,而且这个ApplicationMaster本身也是一个Container(Container0)。启动后,ApplicationMaster会向ResourceManager请求更多的Container,在运行期间,可以动态的请求或释放Continer。

NodeManager

NodeManager的职责包括:与ResourceManager保持通信、管理Container的生命周期、监控每个Container的资源使用、跟踪节点健康状态、管理日志和不同应用程序的附属服务。
NodeManager启动时会向ResourceManager进行注册,然后发送包含自身状态的心跳,并等待来自ResourceManager的指令。
Contianer使用一个Container启动上下文来描述,这个描述包含环境变量、在远程存储上的依赖、安全令牌、NodeManager服务的載荷以及创建进程的必要命令。在验证了Container租约之后,NodeManager为Container配置环境,包括根据资源限制初始化它的监控。NodeManager可以杀死由ResourceManager指定的Container,或者资源超出限制的Container。

ApplicationMaster

ApplicationMaster是协调集群中应用程序执行的进程。每个Application都有自己独特的ApplicationMaster,负责与ResourceManager申请资源,并与NodeManager协同工作来执行和监控任务。
ApplicationMaster启动后,后周期性向ResourceManager发送心跳来报告自己的健康以及更新它资源需求。在建好需求模型后,ApplicationMaster在发送给ResourceManager的心跳中封装了它的偏好和限制。在随后的心跳应答中,ApplicationMaster会收到集群中特定节点上绑定了一定资源的container租约。根据Resourcemanager发来的租约,ApplicationMaster可以更新它的执行计划以适应资源的过剩或不足。Container可以在Application运行期间被申请或释放。

YARN资源模型

YARN资源分配模型提供了更大的灵活性,解决了静态分配的低效率问题。每个Container都有一些非静态资源,这些资源目前支持内存和CPU,还可以支持带宽GPU等。

客户端资源请求

Yarn应用程序是从客户端资源请求开始的。客户端先通知ResourceManager要提交一个Application,ResourceManager在应答中给出一个ApplicationID以及有助于客户端请求资源的集群容量信息。

ApplicationMaster分配Container

在得到ResourceManager的应答后,客户端使用“Application Submission Context”(包含ApplicationID、用户名、队列以及其他启动ApplicationMaster所需的信息)发送请求给ResourceManager,同时也会将“Container Launch Context”发送给ResourceManager。Container Launch Context中描述了资源需求(内存和CPU)、作业文件、安全令牌以及在节点上启动ApplicationMaster所需的其他信息。
ResourceManager收到“Application Submission Context”后,为ApplicationMaster调度一个可用的Container(Container0),然后启动ApplicationMaster,启动ApplicationMaster之后,ResourceManager会告诉ApplicationMaster当前的资源报告。
基于可用资源报告,ApplicationMaster会请求一定的Container。ResourceManager根据资源调度策略,尽可能最优(如本地优势)的分配Container,并作为资源请求的应答发送给ApplicationMaster。
在作业的执行过程中,ApplicationMaster会向ResourceManager发送心跳信息。ApplicationMaster还可以将申请和释放Container的信息包含在心跳中。当作业结束时会向RsourceManager发送完成信息并退出。

ApplicationMaster与Container管理器的通信

在这个阶段,ResourceManager已经将启动NodeManager的控制权交给了ApplicationMaster。ApplicationMaster将单独联系NodeManager并提供Container Launch Context(包含依赖文件、安全令牌以及启动进程所需的命令)。启动Container时,所有数据文件、可执行文件以及必要的依赖文件都会拷贝到节点的本地存储上,依赖文件可以被相同节点上相同Application的Container共享。
一旦启动Container,ApplicationMaster会检查它们的状态,ResourceManager不参与Application的执行,只处理调度以及监控其他资源。Container可以被ResourceManager给Kill掉,当Container被Kill掉之后,NodeManager会清理它的本地工作目录。如果是Applicaiton完成,ResourceManager会通知NodeManager聚合日志病清理Container专用的文件。

管理Application的依赖文件

当启动一个Container时,ApplicationMaster可以指定该Container所需文件,因此,这些文件会被本地化处理,而Yarn负责本地化处理的所有操作。

LocalResource的定义

这部分包含两个操作

Localization:拷贝/下载远程资源到本地文件系统的过程。
LocalResource:LocalResource代表运行Container所需的本地资源。NodeManager负责在启动Container之前将这些资源本地化。
LocalCache:NodeManager维护和管理所有已下载文件的几种本地缓存,这些资源基于最初拷贝该文件时使用的远程URL作为唯一标识。

对于每种资源,应用程序都可以指定下面的信息:

URL:待下载的LocalResource的远程地址。
Size:LocalResource的大小,以byte为单位。
Creation timestamp:资源在远程文件系统上创建的时间。
LocalResourceType:NodeManager本地化的资源类型,包括FILE、ARCHIVE和PATTERN。
Patten:用于从存档文件中提取条目的样式(只对PATTERN类型适用)。
LocalResourceVisibility:指定NodeManager本地化资源的可见性,包括:PUBLIC、PRIVATE和APPLICATION。

LocalResource时间戳

NodeManager在下载资源文件之前,会检查这些文件有没有被修改过,这个检查可以确保LocalResource的一致性–应用程序在整个运行期间使用相同的文件内容。
一旦文件从远程位置拷贝到NodeManager的本地磁盘,它失去了除了URL之外所有与原始文件的联系,不再对远程文件的修改进行跟踪。为了避免不一致的问题,Yarn会让依赖于被修改的远程文件的Container失败。
ApplicationMaster在一个节点上启动Container时,会向NodeManager指定资源的时间戳。

LocalResource类型

FILE:一个普通文件,文本或二进制文件。
ARCHIVE:压缩文件,会被NodeManager自动解压缩。目前可以识别jar、tar、tar.gz以及zip。
PATTERN:ARCHIVE和FILE类型的组合。目前只有JAR文件支持PATTERN类型。

LocalResource的可见性

由LocalResource-Visibility指定,LocalResource有三种可见性:

PUBLIC

标记为PUBLIC的LocalResource可以被任何用户的Container访问。这些文件被拷贝到公共的LocalCache,之后本地上任何Container都可以直接从这个公共的LocalCache中读取文件,在LocalCache被清除之前,都不需要再次下载文件。

PRIVATE

使用PRIVATE标记的LocalResource可以被节点上同一用户的应用程序共享。这些LocalResource被复制到特定用户的私有缓存中。由同一个用户的不同应用程序的所有Container都可以访问这些文件。

APPLICATION

使用APPLICATION标记的资源可以被同一节点上相同Application的Container所共享。这些LocalResource被复制到应用程序专有的LocalCache。

LocalResource的生命周期

不同类型的LocalResource具有不同的生命周期:

PUBLIC LocalResource 在Container或者应用程序结束时都不会被删除,但是在磁盘容量紧张时会删除。这个阈值可以通过yarn.nodemanager.localizer.cache.target-size-mb来指定。
PRIVATE LocalResource 与PUBLIC LocalResource的生命周期相同。
APPLICATION LocalResource会在应用程序结束后立即被删除。

注意:APPLICATION LocalResource生命周期是以Application来界定,而不是ApplicationAttempt。

Apache Hadoop YARN的管理

基本的Yarn管理

Yarn环境的基本配置文件如下:

core-default.xml 系统范围的配置
hdfs-default.xml 分布式文件系统的配置
mapred-default.xml YARN的MapReduce框架配置
yarn-default.xml YARN的配置

YARN的管理工具

Yarn有一些内置管理功能,通过yarn rmadmin -help 命令行命令可以查看具体的命令说明。

增加或关闭YARN节点

有两个文件决定了哪些节点属于集群内,哪些节点数据不属于集群内:yarn.resourcemanager.nodes.include-path和yarn.resourcemanager.nodes.exclude-path。当这两个文件修改之后,可以通过下面的命令行命令来刷新ResourceManager,对节点进行容纳或踢出。

1
yarn rmadmin -refreshNodes

执行刷新命令需要管理员权限,管理员是通过ResourceManager上配置项 yarn.admin.acl 来指定的。

Capacity调度器的配置

调度器的详细配置会在稍后介绍,这里我们介绍重新配置和添加队列。
要重新配置或添加队列,可以使用前面提到的confidence-haddoop2.sh或者直接编辑$Hadoop_conf_dir/etc/hadoop/capacity-scheduler.xml来实现。修改后,执行如下命令行命令来进行刷新:

1
yarn rmadmin -refreshQueues

注意不能够删除队列,只能增加或重新配置队列。

YRAN的Web代理

默认情况下,代理是作为ResourceManager的一部分运行的,但是也可以通过配置项 yarn.web-proxy.address 来使其独立运行。配置项默认为空,表示在ResourceManager运行。独立运行时,通过 yarn.web-proxy.principal 和 yarn.web-proxy.keytab 两个配置项可以控制使用 Kerberos 进行安全认证。

使用 JobHistoryServer

。。。

更新用户到用户组的映射关系

配置项 hadoop.security.group.mapping 决定了 ResourceManager 中使用的用户与用户组的映射关系的定义类。默认为org.apache.hadoop.security.ShellBasedUnixGroupsMapping,如果用户想要实现自己的类,需要实现org.apache.hadoop.security.GroupMappingServiceProvider接口。如果修改了映射关系,则需要使用下面的命令来更新ResourceManager:

1
yarn rmadmin -refreshUserToGroupMapping

更新超级用户代理群映射关系

通过配置 hadoop.proxyuser.< proxy-user-name >.groups ,可以让用户$proxy-user-name 成为具有特殊权限的用户,它可以模拟配置值中的任意用户。配置项 hadoop.proxyuser.< proxy-user-name >.hosts 的配置值为逗号分隔的主机列表,只有配置在这里的这些主机,才可以使用前面的 $proxy-user-name 来模拟所配置的用户。如果修改这两个配置,则需要使用下面的命令行命令来更新ResourceManager:

1
yarn rmadmin -refreshSuperUserGroupsConfiguration

$proxy-user-name用户在模拟其他用户时,自己必须使用Kerberos认证。

更新ResourceManager管理的ACL

配置项 yarn.admin.acl 指定了谁是YRAN集群的管理员。管理员可以更新队列、管理节点列表、用户-群组映射、管理列表本身以及服务级别的ACL。还可以查看任何用户的Application、访问所有的Web界面、调用任何Web服务,以及杀掉任何队列中的Application。
这个配置项的值是一个用逗号分隔的用户列表和一个用逗号分隔的用户组列表,用户列表和用户组列表使用空格分隔,如:

1
user1,user2 group1,group2

当这个配置项变更后,管理员需要使用下面的命令行命令来刷新ResourceManager

1
yarn rmadmin -refreshAdminAcls

重新加载服务级授权策略文件

管理员可以使用下面的命令行命令来重新加载授权策略文件:

1
yarn rmadmin -refreshServiceACL

管理YARN作业

Yarn的作业可以通过 yarn application 命令来管理。可以使用的子命令有:kill、list、status、appTypes和help。如

1
yarn application -list

各个子命令的作用,通过名字我们也可以看出来。

设置Container的内存

通过 yarn-site.xml 中的三个重要的配置,可以控制Container的内存:

yarn.nodemanager.resource.memory-mb 指定了NodeManager可以给Container使用的内存总量(机器上可以用来给Container分配的最大内存)。
yarn.scheduler.minimum-allocation-mb 是ResourceManager允许分配给Container的最小内存。如果请求的Container的内存值小于这个值,则使用这个值,默认为1024MB。
yarn.scheduler.maximum-allocation-mb 是ResourceManager允许分配给Container的最大内存,默认为8192MB。

设置Container核数

通过 yarn-site.xml中的配置,我们可以控制Container的核数:

yarn.scheduler.minimum-allocation-vcores 指定了Container使用的最小core数。
yarn.scheduler.maximum-allocation-vcores 指定了Container使用的最大core数。
yarn.nodemanager.resource.cpu-vcores 指定了节点上可以用来给Container分配的总core数。

用户日志管理

在应用完成后,Yarn通过NodeManager提供的将日志安全的移动到HDFS上的功能,解决了日志管理问题。

Yarn上的日志聚合

有了Yarn,对于同一个应用的所有Container的日志,可以聚合并写到指定文件系统的指定目录中的一个单独文件中。用户可以通过命令行工具、web用户界面或直接从文件系统来访问这些日志。 在MapReduce的JobHistoryServer中运行着一个AggregatedLogDeletionService服务,会周期性的删除聚合日志。

Web用户界面

在应用运行期间,用户可以通过ApplicationMaster的用户界面看到日志,它将用户重定向到NodeManager的用户界面。一旦Application运行结束,完整的信息就交由JobHistoryServer管理。

命令行访问

用户还可以使用命令行工具来与日志进行交互。可以运行下面的命令来查看可用命令:

1
yarn logs

命令的格式为:

1
yarn logs -applicationId < application ID > [Options]

常用的选项有:-appOwner、-containerId、-nodeAddress,例如下面的命令可以打印出整个Application的全部日志:

1
yarn logs -applicationId XXXXXXX

使用命令行工具的优点是可以通过Shell工具来帮助处理日志信息。

日志的管理和配置

yarn.nodemanager.log-dirs 决定了在Container运行时,它的日志在节点上保存的位置。应用的本地化日志目录在{yarn.nodemanager.log-dirs}/application${applicationId}下。各个Container的日志目录位于{yarn.nodemanager.log-dirs}/application${applicationId}/container_{$containerId}子目录中。
yarn.log-aggregation-enable 指定了是否开启日志聚合功能。如果关闭了,NodeManager会本地化保存日志,不会进行聚合操作。

如果开启了日志聚合功能,那么下面的配置也将生效:

yarn.nodemanager.remote-app-log-dir:指定了NodeManager将在哪里聚合日志。
yarn.nodemanager.remote-app-log-dir-suffix:将在${yarn.nodemanager.remote-app-log-dir}/${user}下创建远程日志的后缀,默认为log。需要注意,这里的${user}是指Container创建用户的用户名。比如一个ThriftServer,可能使用xiaomao启动的,那么这里的user就是xiaomao。
yarn.log-aggregation.retain-seconds:删除聚合日志的延迟,将在这个时长之后删除日志。如果为负数则表示不删除。
yarn.log-aggregation.retain-check-interval-seconds:定义检查日志删除的周期,如果为0或负数,那么将会按照聚合日志保留时间的十分之一来计算。
yarn.log.server.url:Application完成后,NodeManager用来将Web用户界面重定向的URL。

如果关闭了日志聚合功能,那么下面的配置将生效:

yarn.nodemanager.log.retain-seconds:日志聚合功能关闭的情况下,各个节点上保留用户日志的时长(单位:秒)。
yarn.nodemanager.log.deletion-threads-count:日志聚合功能关闭情况下,NodeManager用于清理日志所启动的线程数量。

日志权限

远程日志目录需要其所有者是${NMUser},权限为1777,并且目录和组属于${NMGroup}。每个Application级的目录都是770。

Apache Hadoop YARN的架构指南

Yarn将它的功能分为两层:资源管理平台和程序调度执行。ResourceManager只是简单地基于应用程序的请求做中心的资源配置,而不关心应用程序是如何使用这些资源的。它把这个职责委托给了ApplicationMaster,由ApplicationMaster来协调单个应用程序从ResourceManager请求来的资源的逻辑执行,产生应用程序自己的具体的工作计划,ApplicationMaster利用接收到的资源,协调这个具体计划的执行。

概述

ApplicationMaster和响应的Container一起组成了一个YARN的应用程序。ResourceManager提供应用程序的调度。每个应用程序由一个ApplicationMaster管理,以Container的相识请求每个任务的计算资源。Container由ResourceManager调度,在NodeManager上运行。

ResourceManager

ResourceManager和如下的组件一起工作:

每个节点上的NodeManager: 从ResourceManager中获取指令,管理单个节点上可用资源,并接收ApplicationMaster的资源请求。
每个应用程序的ApplicationMaster: ApplicationMaster向ResourceManager申请资源并和NodeManager一起工作,启动、监控和停止Container。

ResourceManager组件概述

ResourceManager会向客户端、NodeManager、ApplicationMaster和其他内部核心组件提供服务。

客户端与ResourceManager交互

用户和平台的第一次交互点是客户端与ResourceManager的交互。这个交互可以分为下面几个部分:

Client Service

这个服务实现了基本的客户端到ResourceManager的接口ApplicationClientProtocol。ClientService处理来自客户端到ResourceManager的RPC通信,包括:

Application的提交
Application的终止
获取Application队列、集群统计,用户ACL以及更多信息。

ClientService为ResouceManager提供额外的保护。当管理员在安全模式下运行Yarn时,ClientService确保所有来自用户的请求都已经得到认证,并且通过查找Application的ACL及后续队列层的ACL对每个用户进行授权。对于不能直接通过Kerberos认证的客户端,ClientService也提供了API,包括ResourceManager代理令牌。

Administration Service

为了确保管理员的请求不会被一般用户的请求饿死,提供高优先级的操作命令,Yarn为所有管理员操作服务提供了一个接口:Administation Service。管理员客户端与Administration Service之间使用ResourceManagerAdministrationProtocol协议通信。
一些重要的管理员操作:

刷新队列。
刷新ResourceManager处理的节点列表。
添加新用户组,添加/更新管理员的ACL。

Application ACL Manager

对于面向用户的API,ResourceManager需要进行控制,只有经过认证的用户才可以访问。ApplicationACLManager管理了每个Application的ACL。ResourceManager可以通过配置yarn.acl.enable为true来启用Application的ACL。
ACL用于控制Application的查看和修改:

查看,决定了通过RPC接口查看一些或所有Application的相关细节,Web UI及Web服务。
修改,决定了哪些用户可以“修改”(杀死)应用程序。

ACL时一个剋执行特殊操作的用户和组列表。用户可以通过他们提交的应用的ApplicationSubmissionContext信息的一部分来指定ACL,这些ACL由ACL Manager对每一个Application进行维护。所有管理员(由yarn.admin.acl属性配置的)可以忽略这些ACL来执行任意操作。
同样的ACL传递给ApplicationMaster,这样ApplicationMaster可以使用该信息让用户访问ApplicationMaster内部的一些服务。
作为ContainerLaunchContext的一部分,当启动一个Container时,也可以使用相同ACL来控制对Application和Container的请求。

ResourceManager Web Application和Web Service

ResourceManager有一个Web应用程序来输出集群的状态信息、指标、节点活跃列表、健康/非健康的节点列表、应用程序列表以及他们的状态和结果、指向ApplicationMaster Web接口的超链接及一个调度的专用接口。

应用程序和ResourceManager的通信

一旦客户端向ResourceManager提交的服务被纳入系统,它穿过ResourceManager内部负责拉起ApplicationMaster的状态机。下面描绘了当ApplicationMaster启动后如何与ResourceManager通信。

ApplicationMaster Service

ApplicationMaster Service用于响应所有来自ApplicationMaster的请求。它与ApplicationMaster之间通过ApplicationMasterProtocol协议,也是唯一的协议。ApplicationMaster Service主要负责如下功能:

注册新的ApplicationMaster。
来自任意正在运行的ApplicationMaster的终止/取消注册请求。
认证来自不同ApplicationMaster的所有请求,确保只有合法的ApplicationMaster发送的请求传递给ResourceMaster中的应用程序对象。
获取来自所有运行ApplicationMaster的Container的分配和释放请求,异步的转发给Yarn调度器。

ApplicationMasterServer有额外的逻辑来保证在任意时间只有一个ApplicationMaster能够发送请求给ResourceManager。

ApplicationMaster 存活监控

为了管理死掉的ApplicationMaster,这个监控跟踪每个ApplicationMaster以及它的最后心跳。在配置的时间间隔外,没有产生心跳的ApplicationMaster会被认为死亡切在ResourceManager中超时。所有处于运行中/分配状态且从属于这个超时的ApplicationMaster的Container也会被标记为死掉。ResourceManager重新调度这个Application,在一个新的Container上重新运行一个ApplicationMaster实例,这样的尝试最多允许两次。

节点和ResourceManager的通信

NodeManager会与下面的RsourceManager的组件进行通信。

Resource Tracker Service

ResourceManagerTracker负责对NodeManager的心跳请求进行响应。它实现了ResourceTracker接口。它负责以下任务:

注册新节点
接收前面注册节点的心跳
确保只有合法的节点可以和ResourceManager通信。

ResouceManager会拒绝任意非法或退役的节点的需求。不满足ResourceManager最小资源配置的请求也会被拒绝。
一旦注册成功,ResourceManager在它的响应信息中会将用于对ApplicationMaster进行认证的相关信息发送给NodeManager。NodeManager会对ApplicationMaster提交的启动Container请求中的NodeManager令牌和Container令牌。处于安全考虑,这些令牌会在之后的心跳中进行同步更新。
Resource Tracker Service转发合法的心跳給YARN调度器,Yarn调度器随后根据节点的空闲可用资源对不同的资源请求作出调度响应。
Resource Tracker Service跟NodeManager存活监控、nodes-list-manager紧密合作。

NodeManager 存活监控

Resource Tracker Service跟踪每一个节点(通过ID)的最后心跳时间,任何没有在配置的时间间隔内发送心跳的节点认为死亡,且在ResourceManager中超时。所有运行在超时节点上的Container也会被标记为死亡,并且不再有新的Container调度到此节点。

Nodes-List Manager

Node-list manager是在ResourceManager内存中的一个集合,包括有效的节点和被排除的节点。它通过读取yarn.resourcemanager.nodes.include-path和yarn.resourcemanager.nodes.exclude-path指定的文件来初始化节点列表。

ResourceManager 核心组件

ApplicationsMaster

ApplicationsManager负责管理已经提交的应用程序的集合。在应用程序提交后,首先检查应用程序的规格,拒绝ApplicationMaster资源请求不合法的Application,确定Applicaiton id的唯一性,最后将通过检查的Application转給调度器。
该组件还负责记录和管理已结束的Application,过一段时间才会从ResourceManager的存储中清除。当一个Application结束后,它将一个ApplicationSummary放到后台的日志文件中。ApplicationSummary是一个Application在结束时的信息总结。
ApplicationsManager保存已经结束的应用程序的缓存,以便用户请求这些应用程序的数据。yarn.resourcemanager.max-completed-applications 指定了ResourceManager存储完成的Application的个数。存储在ResourceManager中的已完成的Application是先进先出的。

ApplicationMaster Launcher

Yarn中,非运行ApplicationMaster的Container都是由ApplicationMaster启动的,而运行ApplicationMaster的Container是由ResourceManager分配并启动的。ApplicationMaster Launcher负责此任务,该组件维护一个线程池来设置环境,且和NodeManager通信启动新提交的ApplicationMaster的Container。在Application结束时,它还会通知NodeManager清理ApplicationMaster对应的Container。

YarnScheduler

Yarn调度器负责为正在运行的Application分配资源,应用程序资源的调度受到容量、队列等多方面的影响。Yarn scheduler基于Application的资源申请请求来执行调度,这些资源包含内存、cpu、磁盘、网络等,当前只支持内存和cpu。

ContainerAllocationExpirer

该组件负责确保所有分配的Container最终都被ApplicationMaster使用,并在相应的NodeManager上启动。ContainerAllocationExpirer包含了一个已经分配但未在相应NodeManager上启动的Container的列表。对于任何一个刚分配的Container,如果在设置的时间内,指定运行Container的NodeManager没有向ResourceManager发送Container已经启动的报告,ResourceManager将会把这个Container标记为死亡且超时。
此外,NodeManager也会在启动Container时对上面所说的超时进行认证(超时信息绑定在Container的ContainerToken中),对于已经超时的Container,NodeManager拒绝启动。

ResourceManager 安全相关的组件

ResourceManager有一系列的组件叫做SecretManager,负责管理令牌和私钥,这些令牌和私钥用于对各个RPC接口上的请求进行认证/授权。

ContainerToken secretManager

SecretManager负责管理ContainerToken。ContainerToken是ResourceManager提供给ApplicationMaster的一个特殊的令牌集合,这样ApplicationMaster可以在特定的节点上使用申请到的Container。SecretManager负责对密钥进行跟踪和更新。
从安全角度来说,在启动一个Container之前,我们不能信任ApplicationMaster传递给NodeManager的信息是正确的,因为ApplicationMaster可能会编造的内存或CPU。为此,ResourceManager发送信息给ApplicationMaster之前,在Container令牌里加密了Container的相关信息。因此一个Container令牌由下面的字段组成:

Container ID:Container的唯一标识。NodeManager使用此信息与特定应用程序绑定。
NodeManager地址:Container令牌编码了目标NodeManager的地址,这样就绑定了NodeManager与Container的关系。
应用程序提交者:提交Application到ResourceManager的用户。NodeManager使用这个用户身份执行所有Container相关的活动。
资源: 通知NodeManager给Container分配的资源。NodeManager使用这个信息计算Container的可用资源,并对Container使用的资源进行监控,超出这个分配限制,就会杀掉Container。
超时时间戳:NodeManager根据这个时间戳判断Container是否已经过期,过期的Container会被拒绝启动。
主键标识符:NodeManager用来验证发送给它们的Container令牌。ResourceManager生成密钥,并为密钥分配一个唯一标识。这个密钥和它的唯一标识会通知所有的NodeManager。在NodeManager注册时会发送给NodeManager,之后会在心跳中进行更新通知。密钥更新时,ResourceManager不会立即使用新的密钥,只有在所有NodeManager都得到新的密钥或经过密钥启动时间,才会启用新的密钥。对于NodeManager,可以同时存在两个密钥,具体使用哪个,会根据密钥的唯一标识进行区分。
ResourceManager标识符:ResourceManager也可能会重启,为了区分Container的分配关系,所以将ResourceManager标识符信息编码到Container令牌中。ResourceManager重启会杀掉之前分配的所有的Container,NodeManager也会拒绝之前ResourceManager分配的Container的启动。

AMRMToken 密钥管理器

只有ApplicationMaster可以以Container的形式来请求资源。为了避免恶意程序模仿ApplicationMaster,ResourceManager使用一个叫做AMRMToken的令牌,每个ApplicationAttempt对应一个令牌。密钥管理器在内存中保存每个令牌直到ApplicationMaster结束,在此期间,使用这些令牌来对ApplicationMaster的请求进行认证。
ApplicationMaster可以通过加载一个由YARN本地化的证书来得到这个令牌。这个本地化文件由ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME指定。
与Container的令牌不同,AMRMToken不与系统中的其他实体共享,出于安全的原因,令牌也会每隔一段时间滚动更新,但是不需要激活间隔。

NMToken 密钥管理器

Container令牌用来对来自ApplicationMaster的启动Container的请求进行授权。它们只在为了启动Container而建立的ApplicationMaster到NodeManager的连接中有效。Container令牌的的关键作用是为了防止资源滥用。
除了启动Container时ApplicationMaster和NodeManager建立连接,NodeManager还允许ApplicationMaster停止Container或获取Container状态。但是需要注意的是,ApplicationMaster与每个Container都创建到NodeManager的持久连接是不现实的。
NMToken服务用来解决这个问题,ApplicationMaster使用NMToken来管理跟一个NodeManager的连接,使用这个令牌想这个节点发送请求。

ResourceManager给每个NodeManager和每个Application的Attempt生成一个NMToken。
当Container创建,ResourceManager生成对应NodeManager的NMToker给applicationMaster。
为了优化网络,不是每分配一个Container就把对应NodeManager的NMToker发送给ApplicationMaster,只会在ApplicationMaster的Container首次在NodeManager上创建时才会发送,除此之外,就是在主密钥更新,NMToker失效的情况下才会在此发送给ApplicationMaster。
当ApplicationMaster接收到新的NMToken,它会将对应于NodeManager的旧令牌替换掉。ApplicationMaster内部使用NMTokenCache库来管理令牌。
ApplicationMaster应该始终使用最新的NMToker。如果ApplicationMaster从ResourceManager收到新的NMToken,那么ApplicationMaster应NodeManager的旧连接应该关闭并创建新的连接。如果连接是使用旧的令牌创建的,在发起请求时,NodeManager会简单的拒绝。
和ContainerToken一样,ApplicationMaster与NMToker也是有绑定关系的,会将Application Attempt集成到NMToken中。

RMDelegationToken密钥管理器

这个组件是ResourceMananger上代理令牌的密钥管理。它负责给客户端生成代理令牌,代理令牌可以传递给想要和ResourceManager通信,但没有经过Kerberos认证的Application。

DelegationToken Renewer

在安全模式下,ResourceManager应该开启Kerberos认证,此组件在应用程序运行期间更新应用程序的令牌,知道令牌不能再更新。

NodeManager

NodeManager是Hadoop YARN在每个计算节点上的代理,它根据YARN Application的要求,使用节点上的资源来运行Contianer。NodeManager本质上是Yarn的工作守护进程,主要职责如下:

保持与ResourceManager的同步。
跟踪节点的健康状况。
管理各个Container的生命周期,监控每个Container的资源使用情况。
管理分布式缓存。
管理各个Container生成的日志。
不同的Yarn应用可能需要的辅助服务。

NodeManager 各个组件概述

NodeManager的核心功能是对Container的管理。NodeManager接受来自ApplicationMaster的启动或停止Container的请求,对Container令牌进行鉴权,管理Container执行的依赖库,监控Container的执行过程。管理员

YARN 的HA

参考地址:https://hadoop.apache.org/docs/r2.7.1/hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html

spark-2-11-BlockManager

发表于 2019-04-23   |   分类于 spark 2.11

在SparkEnv中创建BlockManagerMaster和BlockManagerMasterEndpoint,在生成BlockManagerMaster的时候需要BlockManagerMasterEndpoint(作用是什么??????????)
除此之外,BlockManager也会在SparkEnv中进行创建,并且创建BlockManager的时候,需要BlockManagerMaster的引用(作用是什么???????),而且还需要一个BlockTransferService(作用是什么???????)。

BlockManagerMaster在driver和executor上都有运行。

首先看一下BlockManagerMaster的实现

创建BlockManagerMaster的时使用的RpcEndpointRef,保持与driver的通信,接下来BlockManagerMaster中的方法,都会用到这个RpcEndpointRef。

BlockManagerMaster中的RpcEndpointRef用于向driver发送消息,dirver中会向其他BlockManagerMaster同步这个消息。
比如在BlockManagerMaster的stop方法

1
2
3
4
5
if (driverEndpoint != null && isDriver) {
tell(StopBlockManagerMaster)
driverEndpoint = null
logInfo("BlockManagerMaster stopped")
}

只有dirver节点的BlockMasterMaster停止时,才会向driver通知StopBlockManagerManager事件。

tell方法调用BlockManagerMaster的RpcEndpointRef的askSync方法发送一个事件,并期待得到true,否则抛出SparkException异常。

1
2
3
4
5
private def tell(message: Any) {
if (!driverEndpoint.askSync[Boolean](message)) {
throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
}
}

需要注意的是,这里调用的方法是askSync,同步请求。除此之外,RpcEndpointRef还有ask方法,是异步执行的。

上面我们看到了stop方法的实现,接下来我们顺序看一下:

removeExecutor

移除Executor,其实现是向driver发送一个RemoveExecutor对象。

1
2
3
4
def removeExecutor(execId: String) {
tell(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}

removeExecutorAsync

异步移除Executor,与removeExecutor的操作类似,只是使用的RpcEndpointRef的异步方法。

1
2
3
4
def removeExecutorAsync(execId: String) {
driverEndpoint.ask[Boolean](RemoveExecutor(execId))
logInfo("Removal of executor " + execId + " requested")
}

registerBlockManager

向dirver注册Blockmanager的id(可以这么理解吗??????)。输入的BlockManagerId对象,不包含拓扑信息。注册返回的BlockManagerId会用来更新BlockManagerMaster的数据。

1
2
3
4
5
6
7
8
9
10
11
def registerBlockManager(
blockManagerId: BlockManagerId,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
val updatedId = driverEndpoint.askSync[BlockManagerId](
RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}

其实现就是通过RpcEndpointRef向driver发送一个RegisterBlockManager事件,事件中包含BlockManagerId、最大堆内内存和slaveEndpoint(RpcEndpointRef)。

updateBlockInfo

updateBlockInfo方法用于向driver发送某个Block的最新信息,信息包括存储级别、内存大小和磁盘大小。

1
2
3
4
5
6
7
8
9
10
11
def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
val res = driverEndpoint.askSync[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logDebug(s"Updated info of block $blockId")
res
}

其实现是通过RpcEndpointRef向dirver发送UpdateBlockInfo事件。

getLocations

获取指定BlockId的位置。

1
2
3
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))
}

其实现是通过RpcEndpointRef向driver发送一个GetLocations事件。

getLocations

获取多个BlockId的位置

1
2
3
4
def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
GetLocationsMultipleBlockIds(blockIds))
}

其实现是通过RpcEndpointRef向driver发送一个GetLocationsMultipleBlockIds事件。

contains

判断当前BlockManagerMaster中是否包含指定的Block

1
2
3
def contains(blockId: BlockId): Boolean = {
!getLocations(blockId).isEmpty
}

其实现为,通过RpcEndpointRef向driver发送一个获取BlockId位置的请求,如果可以获得表示Block存在。

getPeers

从driver那里获取集群中其他BlockManagerId

1
2
3
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

通过RpcEndpoint向driver发送一个GetPeers事件

getExecutorEndpointRef

获取指定executor的RpcEndpointRef信息。

1
2
3
def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
}

其实现是,通过RpcEndpointRef向driver发送一个GetExecutorEndpointRef事件。

removeBlock

移除指定的Block,只有driver知道的Block才能够被移除。

1
2
3
def removeBlock(blockId: BlockId) {
driverEndpoint.askSync[Boolean](RemoveBlock(blockId))
}

通过RpcEndpointRef向driver发送RemoveBlock事件来实现。

removeRdd

移除所有归属于指定RDD的Block

1
2
3
4
5
6
7
8
9
10
def removeRdd(rddId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}

通过RpcEndpointRef向driver发送RemoveRdd来实现,第二个参数决定是否要等到结果返回。

removeShuffle

移除所有属于指定Shuffle的Block。

1
2
3
4
5
6
7
8
9
10
def removeShuffle(shuffleId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}

通过RpcEndpointRef向driver发送RemoveShuffle事件来实现,第二个参数决定是否要等到结果返回。

removeBroadcast

移除所有归属于指定Broadcast的Block。

1
2
3
4
5
6
7
8
9
10
11
12
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}

通过RpcEndpointRef向driver发送RemoveBroadcast事件来实现。第三个参数blocking决定是否要等到结果返回。

getMemoryStatus

获取每个BlockManager的内存状态,返回每个BlockManager所分配的最大内存以及内存的剩余。

1
2
3
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
driverEndpoint.askSync[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}

通过RpcEndpointRef向driver发送GetMemoryStatus对象来实现。

getStorageStatus

获取存储存储状态

1
2
3
def getStorageStatus: Array[StorageStatus] = {
driverEndpoint.askSync[Array[StorageStatus]](GetStorageStatus)
}

通过RpcEndpointRef向driver发送GetStorageStatus对象来实现。

getBlockStatus

获取所有Block Manager上的block的状态。该操作开销昂贵,仅用于测试。
通过RpcEndpointRef向driver发送GetBlockStatus事件来实现。

getMatchingBlockIds

返回符合过滤器的BlockId

1
2
3
4
5
6
7
def getMatchingBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askSlaves)
val future = driverEndpoint.askSync[Future[Seq[BlockId]]](msg)
timeout.awaitResult(future)
}

通过RpcEndpointRef向driver发送GetMatchingBlockIds事件来实现。

hasCachedBlock

判断指定Executor是否缓存了Block,不包括broadcast block。

1
2
3
def hasCachedBlocks(executorId: String): Boolean = {
driverEndpoint.askSync[Boolean](HasCachedBlocks(executorId))
}

通过RpcEndpointRef向driver发送HasCachedBlocks事件来实现。

通过对上面方法的了解我们知道,BlockManagerMaster不一定是运行在driver上的,也会运行在Executor上。它内部持有与driver进行沟通的RpcEndpointRef。在SparkEnv中,生成BlockManager的时候,会将BlockManagerMaster传递给BlockManager。

接下来看一下BlockManagerMasterEndpoint

上面我们已经知道了BlockManagerMaster会通过BlockManagerMasterEndpoint来请求相关的操作。BlockManagerMasterEndpoint就是用来对BlockManagerMaster的请求进行响应的。

首先看一下BlockManagerMasterEndpoint的成员

blockManagerInfo 存储了BlockManagerId到BlockManagerInfo的映射关系。
blockManagerIdByExecutor 存储了executor id 到 BlockManagerId的映射关系。
blockLocations 存储了BlockId到BlockManagerId的映射关系。
topologyMapper 拓扑映射的实现类。

上面topologyMapper的定义,首先读取配置项“spark.storage.replication.topologyMapper”的值,如果没有配置则使用默认的拓扑管理DefaultTopologyMapper。然后使用SparkConf进行实例化。

接下来针对BlockManagerMasterEndpoint对BlockManagerMaster的应答进行解析

RegisterBlockManager的应答register

register方法是对RegisterBlockManager事件的应答,它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private def register(
idWithoutTopologyInfo: BlockManagerId,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
val id = BlockManagerId(
idWithoutTopologyInfo.executorId,
idWithoutTopologyInfo.host,
idWithoutTopologyInfo.port,
topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))
val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(oldId) =>
// A block manager of the same executor already exists, so remove it (assumed dead)
logError("Got two different block manager registrations on same executor - "
+ s" will replace old one $oldId with new one $id")
removeExecutor(id.executorId)
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))
blockManagerIdByExecutor(id.executorId) = id
blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id
}

该方法返回的是一个BlockManagerId,与输入参数不同,返回的对象中包含了拓扑信息。而这个拓扑信息就是从topologyManager中得到的。接下来,判断BlockManagerId到BlockManagerInfo的信息中是否含有当前BlockManagerId的信息,如果含有,则跳过执行,只有在不含有的时候,才会继续处理。
接着判断是否在blockManagerIdByExecutor中,如果有,则说明相同executor的BlockManager已经存在了,需要将这个executor的老的BlockManager移除掉,然后将新的BlockManager添加到blockManagerIdByExecutor中。并且根据BlockManagerId.id、最大堆内内存、最大堆外内存、以及要注册的BlockManagerId的RpcEndpointRef生成BlockManagerInfo对象,添加到BlockManagerId到BlockManagerInfo的映射关系中。

UpdateBlockInfo的应答updateBlockInfo

updateBlockInf是对UpdateBlockInfo事件的应答,它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
return true
} else {
return false
}
}
// 更新BlockManagerInfo最后操作时间
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
return true
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
locations = blockLocations.get(blockId)
} else {
locations = new mutable.HashSet[BlockManagerId]
blockLocations.put(blockId, locations)
}
if (storageLevel.isValid) {
locations.add(blockManagerId)
} else {
locations.remove(blockManagerId)
}
// Remove the block from master tracking if it has been removed on all slaves.
if (locations.size == 0) {
blockLocations.remove(blockId)
}
true
}

因为该方法的功能为更新,所以如果参数给定的BlockManagerId不存在会返回false(driver除外)。然后调用BlockManagerInfo中的updateBlockInfo方法来更新BlockManagerInfo,这个方法稍后介绍BlockManagerInfo的时候再看。接着,记录BlockId到BlockManagerId的映射关系,因为一个BlockId的数据可能存在多个BlockManager中(或者分散存储,或者有多个副本),因此BlockId对应的是一个HashSet,HashSet中存放的是BlockManagerId,但是需要注意的是,BlockManagerId是否可以加到上面的HashSet中,取决于参数的storageLevel,只有storageLevel有效时(存储级别为内存或磁盘,且副本个数大于0)才会存储。最后对blockLocations进行清理,对于没有存储位置的BlockId,从blockLocations中删除。

GetLocations的应答getLocations

getLocations是对GetLocations事件的应答,它的定义如下:

1
2
3
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}

在updateBlockInfo方法中我们已经知道blockLocations存储的是BlockId到BlockManagerId的映射。因此,只要有BlockId,就可以找到BlockManagerId的列表。

GetLocationsMultipleBlockIds的应答getLocationsMultipleBlockIds

getLocationsMultipleBlockIds是对GetLocationsMultipleBlockIds事件的应答,它的定义如下:

1
2
3
4
private def getLocationsMultipleBlockIds(
blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
blockIds.map(blockId => getLocations(blockId))
}

相对于getLocations,这个方法传输的参数是BlockId列表,因此只需要迭代获取每个BlockId的位置,返回一个映射关系即可。

GetPeers的应答getPeers

getPeers是对GetPeers事件的应答,它的定义如下:

1
2
3
4
5
6
7
8
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
} else {
Seq.empty
}
}

该方法就是获取BlockManagerId节点上其他非driver的BlockManagerId的列表。从BlockManagerId到BlockManagerInfo的映射关系中得到KeySet,就是当前节点上所有的BlockManagerId,只要过滤到driver以及与当前BlockManagerId相同id的BlockManagerId即可。

GetExecutorEndpointRef的应答getExecutorEndpointRef

getExecutorEndpointRef是GetExecutorEndpointRef事件的应答,它的定义如下:

1
2
3
4
5
6
7
8
private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
for (
blockManagerId <- blockManagerIdByExecutor.get(executorId);
info <- blockManagerInfo.get(blockManagerId)
) yield {
info.slaveEndpoint
}
}

我们在register方法中生成BlockManagerInfo时,BlockManagerInfo的最后一个参数就是slaveEndpoint(一个RpcEndpointRef对象)。所以调用getExecutorEndpointRef方法,只要根据executorId得到BlockManagerId,然后根据BlockManagerId得到BlockManagerInfo,就可以得到这个slaveEndpoint了。

GetMemoryStatus的应答memoryStatus

memoryStatus是GetMemoryStatus事件的应答,其定义如下:

1
2
3
4
5
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
blockManagerInfo.map { case(blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
}

方法逻辑也很简单,BlockManagerInfo中已经存着最大内存了(最大堆内内存和最大堆外内存),并且在每次执行updateBlockInfo方法时会对剩余内存进行操作,只要拿到BlockManagerInfo,就拿到了内存状态,但是缺点是无法知道具体的堆内内存和堆外内存的状态。

GetStorageStatus的应答storageStatus

storageStatus方法是对GetStorageStatus事件的应答,其定义如下:

1
2
3
4
5
6
private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
Some(info.maxOffHeapMem), info.blocks.asScala)
}.toArray
}

StorageStatus,我们理解为存储状态,也就是每个BlockManager(用BlockManagerId表示)的存储状态,包括最大内存、最大堆内内存、最大堆外内存和Block列表(BlockId->BlockStatus的对应关系集合)。这些信息都存储在BlockManagerInfo中,只要拿到BlockManagerInfo就OK了。

GetBlockStatus的应答blockStatus

blockStatus方法是对GetBlockStatus事件的应答,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def blockStatus(
blockId: BlockId,
askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {
val getBlockStatus = GetBlockStatus(blockId)
/*
* Rather than blocking on the block status query, master endpoint should simply return
* Futures to avoid potential deadlocks. This can arise if there exists a block manager
* that is also waiting for this master endpoint's response to a previous message.
*/
blockManagerInfo.values.map { info =>
val blockStatusFuture =
if (askSlaves) {
info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus)
} else {
Future { info.getStatus(blockId) }
}
(info.blockManagerId, blockStatusFuture)
}.toMap
}

获取BlockId的状态,如果askSlaves,则会调用info中的RpcEndpointRef取获取最新的BlockSatus,否则就使用BlockManagerInfo中当前的。一个BlockId可能会对应多个BlockManagerId。

GetMatchingBlockIds的应答getMatchingBlockIds

getMatchingBlockIds方法是对GetMatchingBlockIds事件的应答,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private def getMatchingBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Future[Seq[BlockId]] = {
val getMatchingBlockIds = GetMatchingBlockIds(filter)
Future.sequence(
blockManagerInfo.values.map { info =>
val future =
if (askSlaves) {
info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
} else {
Future { info.blocks.asScala.keys.filter(filter).toSeq }
}
future
}
).map(_.flatten.toSeq)
}

我们已经知道BlockManager的具体信息是使用BlockManagerInfo对象来表示的,在BlockManagerInfo中的blocks就存储了BlockId到BlockStatus的映射关系,filter就是对BlockId对象进行过滤,并得到符合条件的BlockId。

RemoveRdd的应答removeRdd

removeRdd方法是对RemoveRdd事件的应答,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocks.foreach { blockId =>
val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
blockLocations.remove(blockId)
}
val removeMsg = RemoveRdd(rddId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
}

我们已经blockLocatins中存储的是BlockId到Set[BlockManagerId]的映射关系。一个BlockId是否是RDD,可以通过asRDDId得到,这样就可以得到符合条件的RDD的BlockId。通过BlockId能够得到BlockManagerId的列表,通过blockManagerInfo对象,我们可以得到BlockManagerId所对应的BlockManagerInfo。调用BlockManagerInfo中的removeBlock方法移除对RDD所对应的block的操作(空间使用的记录的释放)。然后告诉所有的BlockManager,要删除RDD(通过BlockManagerInfo中的slaveEndpointRef)。

RemoveShuffle的应答removeShuffle

removeShuffle方法是对RemoveShuffle事件的应答,其定义如下:
·

1
2
3
4
5
6
7
8
9
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
// Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Boolean](removeMsg)
}.toSeq
)
}

方法的实现和RemoveRdd方法类似,直接对blockManagerInfo中的values(BlockManagerInfo集合)扫描,调用info的rpcEndpointRef,请求移除Shuffle(发送RemoveShuffle事件)。

RemoveBroadcast的响应removeBroadcast

removeBroadcast方法是对RemoveBroadcast事件的响应,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
}

方法的实现与removeShuffle方法类似,直接对BlockManagerInfo中的values(BlockManagerInfo集合)扫描,然后调用info的repEndpointRef,请求移除广播变量。

RemoveBlock的应答removeBlockFromWorkers

removeBlockFromWorkers方法是对RemoveBlock事件的应答,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def removeBlockFromWorkers(blockId: BlockId) {
val locations = blockLocations.get(blockId)
if (locations != null) {
locations.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
if (blockManager.isDefined) {
// Remove the block from the slave's BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
}
}
}
}

方法的实现逻辑与removeShuffle方法类似,blockLocations中保存了BlockId到Set[BlockManagerId]的映射关系,能够得到BlockId存放在哪些BlockManager中。blockManagerInfo中保存着BlockManagerId到BlockManagerInfo的映射关系,从BlockManagerInfo中就可以得到slave的RpcEndpointRef,从而用来对slave发送Block移除请求。

RemoveExecutor的应答removeExecutor

removeExecutor方法是对RemoveExecutor事件的应答,其定义如下:

1
2
3
4
private def removeExecutor(execId: String) {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}

移除Executor所有的Block,blockManagerIdByExecutor中记录executor上的BlockManager,然后调用removeBlockManager方法来移除BlockManager。

removeBlockManager方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
blockManagerIdByExecutor -= blockManagerId.executorId
blockManagerInfo.remove(blockManagerId)
val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
locations -= blockManagerId
if (locations.size == 0) {
blockLocations.remove(blockId)
logWarning(s"No more replicas available for $blockId !")
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
val maxReplicas = locations.size + 1
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
val blockLocations = locations.toSeq
val candidateBMId = blockLocations(i)
blockManagerInfo.get(candidateBMId).foreach { bm =>
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
bm.slaveEndpoint.ask[Boolean](replicateMsg)
}
}
}
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
logInfo(s"Removing block manager $blockManagerId")
}

方法的逻辑很简单,根据BlockManagerId可以找到对应的BlockManagerInfo,BlockManagerInfo中保存着它所管理的Block列表,在blockLocations中,可以根据BlockId找到Block所分布的BlockManagerId(比如副本的分布)。然后调用分布的BlockManager的RpcEndpointRef,来发送ReplicateBlock事件,确保Block数据的副本数。最后利用listenerBus,广播对应的BlockManagerId已经被移除。

StopBlockManagerMaster的应答

StopBlockManagerMaster的应答很多简单,返回true,然后直接调用stop方法。

BlockManagerHeartbeat的应答heartbeatReceived

heartbeatReceived方法用来对BlockManagerHeartbeat事件进行响应,其实现如下:

1
2
3
4
5
6
7
8
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
blockManagerId.isDriver && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
true
}
}

在每个BlockManagerInfo中有一个变量_lastSeenMs,用来记录BlockManagerInfo的最后的心跳。通过updateLastSeenMs方法更新这个心跳。

通过上面,我就把BlockManagerMasterEndpoint都梳理了一遍,因此我们也了解到,这些操作,都是基于blockId到BlockManagerId的映射关系、BlockManagerId到BlockMangerInfo的映射关系来实现的。其中BlockManagerInfo保存了有关Block的详细信息。因此有必要对BlockManagerInfo进行一下简单的了解。

BlockManagerInfo

BlockManagerInfo存储了BlockManager的详细信息,如BlockManagerId、最大堆内内存、最大堆外内存、链接BlockManager的rpcEndpointRef、最后心跳时间、最大内存、剩余内存以及BlockManager所管理的BlockId信息。其中管理BlockId信息的集合存储的是BlockId到BlockStatus的映射。
其中最重要的方法是removeBlock和updateBlockInfo。这两个方法会影响BlockManager的内存使用和相关Block的映射关系。
接下来我们对这两个方法进行分析:

removeBlock

removeBlock的定义很简单:

1
2
3
4
5
6
7
def removeBlock(blockId: BlockId) {
if (_blocks.containsKey(blockId)) {
_remainingMem += _blocks.get(blockId).memSize
_blocks.remove(blockId)
}
_cachedBlocks -= blockId
}

_blocks是BlockId到BlockStatus的映射关系。因此在remove的实现中,就是将指定的BlockId从_blocks中移除,并将BlockId所占用的内存释放。这里没有对Block的实际移除进行操作,应该是slave节点执行删除完成后再调用,更BlockManagerMaster中的信息。

updateBlockInfo

updateBlockInfo方法用来更新BlockManagerInfo中对BlockManager的状态,也就是开始说的那些状态。_blocks是BlockId到BlockStatus的映射关系,至于BlockId与BlockManagerId的映射关系,保存在BlockManagerMasterEndpoint中的blockManagerInfo中了。
下面是updateBlockInfo的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def updateBlockInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long) {
updateLastSeenMs()
val blockExists = _blocks.containsKey(blockId)
var originalMemSize: Long = 0
var originalDiskSize: Long = 0
var originalLevel: StorageLevel = StorageLevel.NONE
if (blockExists) {
// The block exists on the slave already.
val blockStatus: BlockStatus = _blocks.get(blockId)
originalLevel = blockStatus.storageLevel
originalMemSize = blockStatus.memSize
originalDiskSize = blockStatus.diskSize
if (originalLevel.useMemory) {
_remainingMem += originalMemSize
}
}
if (storageLevel.isValid) {
/* isValid means it is either stored in-memory or on-disk.
* The memSize here indicates the data size in or dropped from memory,
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
var blockStatus: BlockStatus = null
if (storageLevel.useMemory) {
blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
if (blockExists) {
logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
s" (current size: ${Utils.bytesToString(memSize)}," +
s" original size: ${Utils.bytesToString(originalMemSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
} else {
logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
s" (size: ${Utils.bytesToString(memSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
}
}
if (storageLevel.useDisk) {
blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
_blocks.put(blockId, blockStatus)
if (blockExists) {
logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
s" (current size: ${Utils.bytesToString(diskSize)}," +
s" original size: ${Utils.bytesToString(originalDiskSize)})")
} else {
logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
s" (size: ${Utils.bytesToString(diskSize)})")
}
}
if (!blockId.isBroadcast && blockStatus.isCached) {
_cachedBlocks += blockId
}
} else if (blockExists) {
// If isValid is not true, drop the block.
_blocks.remove(blockId)
_cachedBlocks -= blockId
if (originalLevel.useMemory) {
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
s" (size: ${Utils.bytesToString(originalMemSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
}
if (originalLevel.useDisk) {
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
s" (size: ${Utils.bytesToString(originalDiskSize)})")
}
}
}

_blocks中保存了BlockId到BlockStatus的映射关系。BlockStatus中包含的信息有:存储级别、使用的内存size和使用的磁盘size。方法首先对心跳时间进行更新,接着获取判断当前BlockManagerInfo中是否已经包含了要更新的BlockId的信息,如果有,将以存在的BlockStatus从BlockManagerInfo中移除(主要是内存,从剩余内存中恢复占用的内存,因为使用更新数据重新计算剩余内存)。然后,判断存储级别是否有效(内存和磁盘级别的存储并且副本数大于0,才认为有效),如果无效,并且blockId以前存在,则说明BlockId改变了存储级别,则将BlockId从BlockManagerInfo中移除,包括_blocks和_cachedBlocks(存储了当前BlockManagerInfo所管理的所有BlockId);如果存储级别有效,则使用使用新的存储级别、内存size和磁盘size创建一个BlockStatus,然后将BlockStatus存入到_blocks中,对于非广播变量且已经使用了内存或磁盘的BlockId,添加到_cachedBlocks中。
总体来说BlockManagerInfo的updateManagerInfo方法就是用新的BlockId的信息来更新BlockManagerInfo中缓存的信息,只是这种更新不是增长式的是替换式的。

BlockManager

通过上面,我们已经了解到了BlockManagerMaster和BlockManagerMasterEndpoint之间的操作。接下来了解一下另外一部份:BlockManager。
BlockManager相较于BlockManagerMaster,它的功能就复杂多了。它会涉及SerializerManager、MemoryManager、MapOutputTracker、ShuffleManager、BlockTransferService、SecurityManager的联合操作。

Spark 2.11 Spark Ui中 Kill Job

发表于 2019-04-23   |   分类于 spark 2.11

本文记录在Spark Applications UI中Kill job时的流程,其实也就是cancel Job的流程。

界面点击kill链接,会执行/jobs/job/kill?id=${jobId}。首先看看SparkUI中对于这个链接的注册:

1
2
3
4
attachHandler(createRedirectHandler(
"/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST")))

从上面我们可以看出,在点击了/jobs/job/kill链接后,会交由 jobsTab.handleKillRequest来进行响应处理。
JobsTab.handleKillRequest方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
// stripXSS is called first to remove suspicious characters used in XSS attacks
val jobId = Option(UIUtils.stripXSS(request.getParameter("id"))).map(_.toInt)
jobId.foreach { id =>
if (jobProgresslistener.activeJobs.contains(id)) {
sc.foreach(_.cancelJob(id))
Thread.sleep(100)
}
}
}
}

上面的代码首先验证用户权限,然后获取job的id,最后调用SparkContext的cancelJob方法来取消Job。
在SparkContext中,cancel方法只是简单的调用了DAGScheduler的scancelJob方法。

1
2
3
def cancelJob(jobId: Int): Unit = {
dagScheduler.cancelJob(jobId, None)
}

而DAGScheduler的scancelJob方法,只是将一个JobCancelled对象加入到DAGScheduler的eventProcessLoop(具体实现为DAGSchedulerEventProcessLoop对象)队列中。于是我们转到DAGSchedulerEventProcessLoop的doOnReceive方法中。

1
2
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)

于是,又将事件交给 DAGScheduler的handleJobCancellation方法处理了。handleJobCancellation的定义如下:

1
2
3
4
5
6
7
8
private[scheduler] def handleJobCancellation(jobId: Int, reason: Option[String]) {
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
failJobAndIndependentStages(
jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason.getOrElse("")))
}
}

方法中,首先会检测jobId是否归属与某个Stage,只有Job归属于某个Stage,才会将job进行处理,调用failJobAndIndependentStages方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private def failJobAndIndependentStages(
job: ActiveJob,
failureReason: String,
exception: Option[Throwable] = None): Unit = {
val error = new SparkException(failureReason, exception.getOrElse(null))
var ableToCancelStages = true
val shouldInterruptThread =
if (job.properties == null) false
else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
// Cancel all independent, running stages.
val stages = jobIdToStageIds(job.jobId)
if (stages.isEmpty) {
logError("No stages registered for job " + job.jobId)
}
stages.foreach { stageId =>
val jobsForStage: Option[HashSet[Int]] = stageIdToStage.get(stageId).map(_.jobIds)
if (jobsForStage.isEmpty || !jobsForStage.get.contains(job.jobId)) {
logError(
"Job %d not registered for stage %d even though that stage was registered for the job"
.format(job.jobId, stageId))
} else if (jobsForStage.get.size == 1) {
if (!stageIdToStage.contains(stageId)) {
logError(s"Missing Stage for stage with id $stageId")
} else {
// This is the only job that uses this stage, so fail the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
markStageAsFinished(stage, Some(failureReason))
} catch {
case e: UnsupportedOperationException =>
logInfo(s"Could not cancel tasks for stage $stageId", e)
ableToCancelStages = false
}
}
}
}
}
if (ableToCancelStages) {
// SPARK-15783 important to cleanup state first, just for tests where we have some asserts
// against the state. Otherwise we have a *little* bit of flakiness in the tests.
cleanupStateForJobAndIndependentStages(job)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}

这块的逻辑大概如下:首先从配置中获取spark.job.interruptOnCancel参数的值,用于决定在cancel task的时候是否要中断线程。然后根据jobId,获取它所处于的Stage。根据Stage的id,调用taskScheduler取消task,并将Stage标记为完成。
其中取消task的代码为:

1
taskScheduler.cancelTasks(stageId, shouldInterruptThread) //shouldInterruptThread用于表示是否要中断线程,如果不中断,则使用其他的方式来结束线程。稍后介绍。

继续跳转到TaskScheduler的cancelTasks方法,这个方法是抽象方法,具体的实现是由TaskSchedulerImpl来实现的。那么我们看一下 TashSchedulerImpl的cancelTasks方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
attempts.foreach { case (_, tsm) =>
// There are two possible cases here:
// 1. The task set manager has been created and some tasks have been scheduled.
// In this case, send a kill signal to the executors to kill the task and then abort
// the stage.
// 2. The task set manager has been created but no tasks has been scheduled. In this case,
// simply abort the stage.
tsm.runningTasksSet.foreach { tid =>
taskIdToExecutorId.get(tid).foreach(execId =>
backend.killTask(tid, execId, interruptThread, reason = "Stage cancelled"))
}
tsm.abort("Stage %s cancelled".format(stageId))
logInfo("Stage %d was cancelled".format(stageId))
}
}
}

cancelTasks的实现逻辑是:根据stageId得到运行task set的TaskSetManager,然后根据TaskSetManager得到正在运行的task集合,根据task的id从taskIdToExecutorId映射关系中得到运行task的Executor的id,这样调用SchedulerBackend的killTask方法,将executorId和taskId作为参数,杀掉task。
对于SchedulerBackend的实现,我们在启动executor的时候就知道了,对应的实现为CoarseGrainedSchedulerBackend。
CoarseGrainedSchedulerBackend.killTask

1
2
3
4
override def killTask(
taskId: Long, executorId: String, interruptThread: Boolean, reason: String) {
driverEndpoint.send(KillTask(taskId, executorId, interruptThread, reason))
}

于是,就向driver发送了一个 KillTask对象。driver会通过endpoint向executorId所对应的Executor发送kill task的命令。我们跳到Executor中killTask的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
val taskRunner = runningTasks.get(taskId)
if (taskRunner != null) {
if (taskReaperEnabled) {
val maybeNewTaskReaper: Option[TaskReaper] = taskReaperForTask.synchronized {
val shouldCreateReaper = taskReaperForTask.get(taskId) match {
case None => true
case Some(existingReaper) => interruptThread && !existingReaper.interruptThread
}
if (shouldCreateReaper) {
val taskReaper = new TaskReaper(
taskRunner, interruptThread = interruptThread, reason = reason)
taskReaperForTask(taskId) = taskReaper
Some(taskReaper)
} else {
None
}
}
// Execute the TaskReaper from outside of the synchronized block.
maybeNewTaskReaper.foreach(taskReaperPool.execute)
} else {
taskRunner.kill(interruptThread = interruptThread, reason = reason)
}
}
}

这里就是kill task的具体实现,从代码中我们可以知道,kill task有两种逻辑:

直接调用TaskRunner对象的kill,什么也不操作。
除了调用TaskRunner对象的kill,还会使用Reaper对Task进行dump操作。

如果要是启用Reaper,需要开启spark.task.reaper.enabled参数。

对于Reaper的实现逻辑:调用TaskRunner的kill,然后对Task对应的线程进行dump操作。关于reaper还有好几个参数,用来定义Reaper的逻辑:

直接调用TaskRunner的kill,就是不进行Reaper监控。而且我们也知道了,使用Reaper进行监控,其内部也会调用TaskRunner的kill,所以下面我们进行TaskRunner.kill方法的逻辑分析:

1
2
3
4
5
6
7
8
9
10
11
def kill(interruptThread: Boolean, reason: String): Unit = {
logInfo(s"Executor is trying to kill $taskName (TID $taskId), reason: $reason")
reasonIfKilled = Some(reason)
if (task != null) {
synchronized {
if (!finished) {
task.kill(interruptThread, reason)
}
}
}
}

TaskRunner中保存着Task的引用,调用Task的kill方法。

1
2
3
4
5
6
7
8
9
10
def kill(interruptThread: Boolean, reason: String) {
require(reason != null)
_reasonIfKilled = reason
if (context != null) {
context.markInterrupted(reason)
}
if (interruptThread && taskThread != null) {
taskThread.interrupt()
}
}

这里有三个逻辑:

设置_reasonIfKilled,如果Task还没有运行,设置了这个值后,task运行时会直接退出。一旦task运行了,设置这个值是不会停止的,因此需要第二个方式。
设置context.markInterrupted(reason),一旦Task运行了,就会在其内部构建一个TaskContext,调用这个方法,进行状态标记。在执行RDD中数据读取(调用InterruptibleIterator的next方法)的时候会调用TaskContext的killTaskIfInterrupted方法进行中断验证,如果已经调用过markInterrupted方法,killTaskIfInterrupted方法就会抛出异常,终止Task的运行。
参数指定要中断线程,直接执行线程中断。

这里在附加一下InterruptibleIterator的实现:

1
2
3
4
5
6
7
8
9
10
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {
def hasNext: Boolean = {
context.killTaskIfInterrupted()
delegate.hasNext
}
def next(): T = delegate.next()
}

其实就是对dlegate所指定的Iterator进行了一层包装。查找InterruptibleIterator的使用,RDD的getOrCompute方法返回的Iterator就是InterruptibleIterator类型。
我们知道Task读取数据时,就会用到RDD的getOrCompute方法,因此就可以让task中断啦。对于InterruptibleIterator的使用,不只是RDD,还有BlockStoreShuffleReader、NewHadoopRDD等都在使用。

Spark 2.11 Submit的流程

发表于 2019-02-19   |   分类于 spark 2.11

本文用于整理在使用spark-submit提交任务的流程

spark-submit脚本的定义

使用spark-submit提交任务的时候,实际上调用的是${SPARK_HOME}/bin/spark-submit来提交的。例如:

1
2
3
4
5
6
7
8
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

如下是${SPARK_HOME}/bin/spark-submit脚本的定义:

1
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

从代码可以看出,实际上执行的是 org.apache.spark.deploy.SparkSubmit类。因此我们具体看看这个类的实现。

SparkSubmit的启动流程

如下是SparkSubmit主函数的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
override def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}

进入主函数后,会使用SparkSubmitArguments类对SparkSubmit的命令行参数进行解析。然后根据参数中的action信息进行具体的操作。
由此也可以看出,SparkSubmit支持三种action:–submit、–kill和–status。

SparkSubmitArguments

SparkSubmitArgument的继承关系

image.png
其中SparkSubmitArgumentsParser是没有具体实现,SparkSubmitOptionParser主要用来解析option。

SparkSubmitArguments实例化执行

在SparkSubmit中会利用main的参数生成一个SparkSubmitArguments的,生成SparkSubmitArguments对象的时候就会执行如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try {
parse(args.asJava)
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Remove keys that don't start with "spark." from `sparkProperties`.
ignoreNonSparkProperties()
// Use `sparkProperties` map along with env vars to fill in any missing parameters
loadEnvironmentArguments()
validateArguments()

首先调用的是parse方法,而SparkSubmitArguments本身是没有这个方法的,但是它会从父类SparkSubmitOptionParser那里继承,因此调用的父类的parse方法。

SparkSubmitOptionParser

SparkSbumitOptionParser用来解析命令行提供的参数,从SparkSubmitOptionParser中我们可以看出,可以使用的参数列表如下:

标准参数
option的定义 意义 例如
–class ‘’ –
–conf ‘’,可以使用“-c”来代替 –
–deploy-mode 部署模式,有两种模式:client和cluster –
–driver-class-path ‘’ –
–driver-cores ‘’ –
–driver-java-options ‘’ –
–driver-library-path ‘’ –
–driver-memory ‘’ –
–executor-memory 执行application的内存大小 –
–jars ‘’ –
–kill ‘’ –
–master ‘’ –master yarn-client
–name 提交的application的名字 –name spark_thrift_server_test
–packages ‘’ –
–exclude-packages ‘’ –
–properties-file ‘’ –
–proxy-user 提交任务所用的代理用户 –proxy-user myUser
–py-files ‘’ –
–repositories ‘’ –
–status ‘’ –
–total-executor-cores 执行application的最大executor的数量 –total-executor-cores 20
标识型参数。

这些option只会作为检测,不会取值。
|option的定义|意义|例如|
|-|-|-|
|–help|‘’,可以使用“-h”代替|–|
|–supervise|‘’|–|
|–usage-error|‘’|–|
|–verbose|‘’,可以使用“-v”代替|–|
|–version|‘’|–|

Yarn独享参数
option的定义 意义 例如
–archives ‘’ –
–executor-cores 执行任务的executor的core的数量 –executor-cores 3
–keytab ‘’ –
–num-executors 执行任务的executor的个数 –num-executors 10
–principal ‘’ –
–queue 执行任务的资源队列 –queue test_queue
option的定义和解析
非标识型参数

parse方法,实现了对命令行参数的解析。

1
Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

定义了获取参数的模式,必须以“–”开头,然后是不能为“=”的任意字符,接着是“=”号,最后是任意字符,例如:–name=testName。
但是这也不是必须的格式,在解析参数的代码中还包含了另外一种逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Matcher m = eqSeparatedOpt.matcher(arg);
if (m.matches()) {
arg = m.group(1);
value = m.group(2);
}
// Look for options with a value.
String name = findCliOption(arg, opts);
if (name != null) {
if (value == null) {
if (idx == args.size() - 1) {
throw new IllegalArgumentException(
String.format("Missing argument for option '%s'.", arg));
}
idx++;
value = args.get(idx);
}
if (!handle(name, value)) {
break;
}
continue;
}

可以第一个参数为key,第二个参数为value。
也就是说命令行参数中包含的参数,要么是key=value的格式,要么是key value的模式。也可以是两种格式的组合,但是必须要保证key value的匹配,如果key key=value,那么后面的这个key=value将会作为前面key的value来解析。另外,如果参数的最后一个为key,还会抛出IllegalArgumentException异常。
如下传递是正确的

1
key1 value1 key2=value2 key3 value3

如下传递是错误的

1
2
key1 key2=value2 -- 会把key2=value2当做key1的value
key1=value2 key2 -- 抛出IllegalArgumentException,无法获取key2的value

还有一个需要注意的是,如果提供了不在上面列表中的参数,会抛出UnsupportedOperationException异常,并退出程序的执行。

标识型参数

标志性参数只会检测这个参数是否存在,不会从命令行中读取值。

参数的处理

上面参数解析后,就会得到key和value。然后调用handle方法做进一步处理。SparkSubmitOptionParser要求子类必须实现handle方法,如果子类未实现handle方法,就会调用自身的handle方法,从而抛出UnsupportedOperationException异常。因此,我们看一下子类SplarkSubmitArguments的handle方法。

SparkSubmitArgumnet.handle

方法逻辑很简单,就是检测上面的key是否合法(规定的),如果合法,就设置相应属性的值。如下是合法参数名的值,以及value存储对应SparkSubmitArgumengt中变量,以及value的合法性要求:
|key的定义|value的存储变量|value的合法性要求|
|-|-|-|
|–name|name|-|
|–master|master|-|
|–class|mainClass|-|
|–deploy-mode|deployMode|client或cluster|
|–num-executors|numExecutors|-|
|–total-executor-cores|totalExecutorCores|-|
|–executor-cores|executorCores|-|
|–executor-memory|executorMemory|-|
|–driver-memory|driverMemory|-|
|–driver-cores|driverCores|-|
|–driver-class-path|driverExtraClassPath|-|
|–driver-java-options|driverExtraJavaOptions|-|
|–driver-library-path|driverExtraLibraryPath|-|
|–properties-file|propertiesFile|-|
|–kill|value保存在submissionToKill中,同时设置action = KILL|-|
|–status|value保存在submissionToRequestStatusFor中,同时设置action = REQUEST_STATUS|-|
|–supervise|设置supervise=true|-|
|–queue|queue|-|
|–files|files|-|
|–py-files|pyFiles|-|
|–archives|archives|-|
|–jars|jars|-|
|–packages|packages|-|
|–exclude-packages|packagesExclusions|-|
|–repositories|repositories|-|
|–conf或-c|解析properties文件后保存在sparkProperties中|-|
|–proxy-user|proxyUser|-|
|–principal|principal|-|
|–keytab|keytab|-|
|–help|执行printUsageAndExit(0),打印用法并退出|-|
|–verbose|设置verbose为true|-|
|–version|调用printVersionAndExit()方法,打印版本并退出|-|
|–usage-error|执行printUsageAndExit(1),打印用法并退出|-|

参数的解析流程

parse方法定义在SparkSubmitOptionParser类中,用于解析命令行传递过来的参数。参考上面的“option的定义和解析”部分。

合并默认的Spark属性

合并默认的Spark属性,调用的是mergeDefaultSparkProperties方法。
该方法将会从两个位置读取配置文件:

命令行中-–properties-file参数指定的属性文件。
SPARK_CONF_DIR目录或SPARK_HOME/conf下的spark-default.properties文件

其中优先使用–properties-file参数所指定的文件。
根据这里确定的属性文件,将属性加载到内存中作为默认属性与命令行中使用–conf或–c配置的属性进行合并,优先使用–conf或–c指定的属性。

驳回非Spark属性

什么属于非Spark属性呢?就是那些不以“spark”开头的属性。将上面合并后的属性进行遍历,将不是以“spark”开头的属性,从属性集合中移除。

加载环境参数

加载环境参数的意思就是对于那些通过上面操作,依然没有被设置的属性,从环境配置中再加在一次。基本上的思路就是判断属性是否已经有值,如果没有则从上面的属性中加在一次,如果还没有则再从环境中加在一次。所以参数的优先级如下:命令行中指定 -> 属性中配置(–conf-> –properties-file -> spark_home/conf/spark-default.properties) -> 环境

变量 属性中的配置项 环境中的配置项
master spark.master MASTER
driverExtraClassPath spark.driver.extraClassPath Null
driverExtraJavaOptions spark.driver.extraJavaOptions Null
driverExtraLibraryPath spark.driver.extraLibraryPath Null
driverMemory spark.driver.memory SPARK_DRIVER_MEMORY
driverCores spark.driver.cores Null
executorMemory spark.executor.memory SPARK_EXECUTOR_MEMORY
executorCores spark.executor.cores SPARK_EXECUTOR_CORES
totalExecutorCores spark.cores.max Null
name spark.app.name Null
jars spark.jars Null
files spark.files Null
ivyRepoPath(新增) spark.jars.ivy Null
packages spark.jars.packages Null
packagesExclusions spark.jars.excludes Null
deployMode spark.submit.deployMode DEPLOY_MODE
numExecutors spark.executor.instances Null
queue spark.yarn.queue Null
keytab spark.yarn.keytab Null
principal spark.yarn.principal Null

基本的设置就是以上这些,但是除了这些,还有其他一些逻辑:

主类的确定

当没有通过mainClass指定主类,且不是python或R时,会从jar包中读取主类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (mainClass == null && !isPython && !isR && primaryResource != null) {
val uri = new URI(primaryResource)
val uriScheme = uri.getScheme()
uriScheme match {
case "file" =>
try {
val jar = new JarFile(uri.getPath)
// Note that this might still return null if no main-class is set; we catch that later
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
} catch {
case e: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
}

master的确定

如果没有设置master,则将master设置为local[*]

name的确定

如果master是以“yarn”开头的,则使用当前的name,如果没有设置,则从环境变量SPARK_YARN_APP_NAME中获取;如果环境中也没有,则从主类中获取,如果主类中也没有,则从primaryResource中获取。

1
2
3
4
5
6
7
8
9
if (master.startsWith("yarn")) {
name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
}
// Set name from main class if not given
name = Option(name).orElse(Option(mainClass)).orNull
if (name == null && primaryResource != null) {
name = Utils.stripDirectory(primaryResource)
}

action的确定

如果没有设置action,则将action默认设置为submit。

验证参数

验证参数是要根据action,进行区分验证,不同的action有不同的参数要求。

submit

对于提交参数的验证,要求如下

必须指定主要资源:也就是说jar、python或R不能同时为空。
主类必须通过–class指定或者在Jar中包含。
如果指定了pyFile,则要求主资源必须是Python脚本。
如果master是yarn,则要求环境变量中必须包含HADOOP_CONF_DIR或YARN_CONF_DIR
配置中不能同时存在 proxyUser和principal。

为什么proxyUser和principal不能同时存在呢?因为这是两种不同的认证方式,只能使用一种。对于proxyUser方式,会调用Hadoop的相关API创建代理用户,然后用代理用户执行runMain方法。如果设置了principal,就必须设置–keytab来指定keytab文件,然后会使用keytab信息进行登录。

kill

对于kill参数的验证,要求如下

master必须是以spark://或mesos://开头的,其他的不支持kill
submissionToKill必须指定,也就是必须指定要kill的submission。

status

对于请求状态的验证,要求如下

master必须是以spark://或mesos://开头的,其他的不支持status查询
submissionToRequestStatusFor必须指定,也就是必须说明要查询状态的信息。

至此SparkSubmitArguments对参数的加载(从命令行、配置文件、环境变量)和验证就完成。我们继续回到SparkSubmit中。

Spark Submit

我们已经知道SparkSubmit类支持三种action,现在我们先看看当action为submit时的相关操作。
用来处理action为“submit”的是submit方法,在submit方法中,大体分为三个步骤:提交环境准备、根据代理用户进行操作、根据部署模式进行操作。
对于代理用户的相关操作,就是判断是否指定了代理用户,如果指定了代理用户,则使用代理用户的身份执行runMain,如果没有指定代理用户,则直接执行runMain(相当于使用当前用户)。
对于部署模式的相关操作,基本上都是调用doRunMain方法,只是对于standalone模式下,如果出现异常会做一些其他操作。doRunMain方法,就是根据代理用户进行操作。
所以,这里会将主要任务落到两个方法上:runMain和prepareSubmitEnvironment。

prepareSubmitEnvironment

提交前会进行环境的准备,环境准备通过prepareSubmitEnvironment方法实现。该方法的代码量很大,但是基本上就是验证参数的正确性、参数的合法性、某些未写参数的补充、以及执行类的确定。
这里需要注意下参数的变换,我们上面已经知道配置参数可以通过命令行、命令行的配置文件、默认配置文件和环境变量中得到。这里再生成下一个主类使用的参数时,参数只会包含三个类型:–class、–jar、–arg(对于python会包含–primary-py-file,对于R会包含–primary-r-file)。所以那些属性会作为–arg进行提供。

runMain

runMain就是使用prepareSubmitEnvironment确定的环境变量和属性来执行prepareSubmitEnvironment中确定的主类,直接执行主类的main方法。

例如,对于Yarn集群模式,执行的就是org.apache.spark.deploy.yarn.Client。

Client

Client.submitApplication

org.apache.spark.deploy.yarn.Client类中submitApplication方法实现了application的提交逻辑,基本流程如图:
image.png

创建证书

创建证书是通过setupCredentials方法实现的,其定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def setupCredentials(): Unit = {
// 判断是否是使用Kerberos进行登录,如果配置中设置了principal就表示使用kerberos,而且要求配置了Keytab信息
loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
if (loginFromKeytab) {
principal = sparkConf.get(PRINCIPAL).get
keytab = sparkConf.get(KEYTAB).orNull
require(keytab != null, "Keytab must be specified when principal is specified.")
// 加载Keytab文件,并对文件名
val f = new File(keytab)
amKeytabFileName = f.getName + "-" + UUID.randomUUID().toString
sparkConf.set(PRINCIPAL.key, principal)
}
// 创建当前用户的证书的拷贝
credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials)
}

创建yarnClient并创建Application

submitApplication方法在设置完证书后,就会使用yarnClient来创建Application,并得到Applcation的相关信息(包括application id、application submission context等)以供后面使用,其代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
// 初始化yarnClient,并启动
yarnClient.init(yarnConf)
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// 并得到新application的响应信息,并从响应信息中得到application的id
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()

资源验证

资源验证的目的就是检查Yarn集群中是否有足够的内存来运行Application Master,该功能通过verifyClusterResources方法来实现,AM所需的内存资源为内存和负载内存之和。
对于内存和负载内存的配置值,会根据集群模式的不同而取值不同。
对于集群模式,会从spark.driver.memory和spark.yarn.driver.memoryOverhead配置中读取,对于非集群模式,则会从spark.yarn.am.memory和spark.yarn.am.memoryOverhead中读取。
如果没有设置负载内存,负载内存还有一个推算公式:max((0.10 * 内存), 384L)

创建Container启动上下文

创建Container启动上下文是通过createContainerLaunchContext方法实现。对于这个方法,其功能大致分为三个部分:启动环境准备、资源准备和启动命令的拼接。

启动环境准备

启动环境的准备是通过setupLaunchEnv方法实现的。 – 以后补充

资源准备

资源的准备是通过prepareLocalResources方法实现的。下面将详细介绍这个方法。

证书的管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 从证书管理器那里获得证书下一次更新的时间
val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials)
// 如果已经有证书了,则将证书设置给当前用户
if (credentials != null) {
UserGroupInformation.getCurrentUser.addCredentials(credentials)
logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
}
// 如果我们使用principal和keytab登录,那么证书需要在之后的时间被重新构建,我们应当将下一次的重构和更新时间传递给构建者和更新者
if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() &&
nearestTimeOfNextRenewal != Long.MaxValue) {
val currTime = System.currentTimeMillis()
val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime
val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime
sparkConf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong)
sparkConf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong)
}

证书管理的功能就是:如果证书存在,将证书添加到当前用户中;根据时间设置证书的重新生成时间和更新时间。

资源添加和验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 用于保存添加到分发缓存的URI列表,如果同一个URI被添加多次,YARN将以内部错误使container启动失败
val distributedUris = new HashSet[String]
// 用于保存添加到分发缓存的URI是否有相同的名字,如果有相同的名字的被提交多次,但是文件路径不同,Yarn将以内部错误是container启动失败
val distributedNames = new HashSet[String]
val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
.getOrElse(fs.getDefaultReplication(destDir))
// 用来保存本地资源的集合
val localResources = HashMap[String, LocalResource]()
// 在HDFS上创建 staging目录,并将目录的访问权限设置为700
FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
// 保存文件的文件状态,不是以文件为key而是一个文件的URI作为key
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
val symlinkCache: Map[URI, Path] = HashMap[URI, Path]()
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
// 获取要分发的文件名
val fileName = new File(uri.getPath).getName
// 如果URI已经被添加多次,添加失败,如果文件名也被添加多次,添加失败,否则添加成功
if (distributedUris.contains(uriStr)) {
logWarning(s"Same path resource $uri added multiple times to distributed cache.")
false
} else if (distributedNames.contains(fileName)) {
logWarning(s"Same name resource $uri added multiple times to distributed cache")
false
} else {
distributedUris += uriStr
distributedNames += fileName
true
}
}

distributedUris和distributedNames用来进行资源添加的验证(在addDistributedUri中使用),在添加资源的时候,资源是以URI的方式来添加,对于同一个URI只允许添加一次,并且如果URI不同,但是文件名相同的情况,也会验证,同一个文件名的文件也只允许添加一次。另外代码中还设置了资源文件的存放位置((spark.yarn.stagingDir|~)/.sparkStaging/application_id)、资源文件的副本数(3)以及资源文件目录的权限(700)。addDistributedUri方法用来进行验证。

资源分发操作

distribute方法是资源分发的实现。具体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def distribute(
path: String,
resType: LocalResourceType = LocalResourceType.FILE,
destName: Option[String] = None,
targetDir: Option[String] = None,
appMasterOnly: Boolean = false): (Boolean, String) = {
val trimmedPath = path.trim()
val localURI = Utils.resolveURI(trimmedPath)
// 判断URI是否是以local开头的
if (localURI.getScheme != LOCAL_SCHEME) {
// 将URI添加到分发缓存,同时会对URI和FileName进行验证,不允许重复添加
if (addDistributedUri(localURI)) {
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
// 拼接在混存中保存的名字
val linkname = targetDir.map(_ + "/").getOrElse("") +
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
// 保存文件的缓存子目录
val destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache)
// 目标文件系统,HDFS
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
// 将文件添加到缓存
// destFs 是HDFS文件系统
distCacheMgr.addResource(
destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
appMasterOnly = appMasterOnly)
(false, linkname)
} else {
(false, null)
}
} else {
(true, trimmedPath)
}
}

这个方法的作用就是将不是以“local”开头的文件添加到分发缓存中(调用addDistributedUri方法),添加的时候会进行验证。copyFileToRemote方法的作用,就是将本地的文件(file://)上传到HDFS上,copyFileToRemote方法中会进行两个文件系统的对比,只有当源文件系统和目标文件系统不同(不同的HDFS或一个是HDFS一个是普通文件系统)才会进行复制文件。最后调用ClientDistributedCacheManager的addResource方法将文件加入到Resource列表中供后面启动container使用。

分发Keytab文件
1
2
3
4
5
6
7
8
if (loginFromKeytab) {
logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
" via the YARN Secure Distributed Cache.")
val (_, localizedPath) = distribute(keytab,
destName = Some(amKeytabFileName),
appMasterOnly = true)
require(localizedPath != null, "Keytab file already distributed.")
}

amKeytabFileName是添加了UUID后缀的名字,作用就是调用distribute方法,将keytab文件分发,但是只是分发给Application Master。

jar文件的分发逻辑

在对Jar进行分发的时候,会有三种情况:

设置了spark.yarn.archive
设置了spark.yarn.jars
spark.yarn.archeive和spark.yarn.jars都没有设设置

当同时设置了spark.yarn.archive和spark.yarn.jars时,spark.yarn.archive优先级高。

spark.yarn.archive

如果配置了spark.yarn.archive(目录,且要求必须不是以local开头的文件系统),则将spark.yarn.archive配置的目录作为ARCHIVE类型的资源分发到“spark_libs”目录中。

spark.yarn.jars

如果配置了spark.yarn.jars(必须是文件,多个文件用逗号分隔),则将spark.yarn.jars中的每个文件作为FILE类型资源分发到“spark_libs”目录中。
对于local文件(以local://)开头的文件,会将其重新设置到sparkConf中的“spark.yarn.jars”配置项中。

上传${SPARK_HOME}下的jars

对于既没有配置spark.yarn.archive又没有配置spark.yarn.jars,那么系统会将环境变量${SPARK_HOME}中指定的目录下的jars或assembly/target/scala-%{scala_version}}/jars目录中的jar打包为一个“spark_libs.zip”文件,然后将这个文件作为ARCHIVE类型分发到“spark_libs”目录。

看了三种情况,这里有一个问题,对于配置了spark,yarn.archive和什么都没有配置的情况,都是将URI作为ARCHIVE类型资源分发到“spark_libs”目录中,那么这两个参数还有什么作用么?其实最主要的区别就是在进行文件拷本的时候,也就是调用copyFileToRemote方法的时候,可以减少上传操作。

如下是jar文件分发的逻辑实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
// 如果定义了 spark.yarn.archive
if (sparkArchive.isDefined) {
val archive = sparkArchive.get
// 要求 spark.yarn.archive 的value不是是本地目录(local://开头)
require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
distribute(Utils.resolveURI(archive).toString,
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_LIB_DIR))
} else {
sparkConf.get(SPARK_JARS) match {
case Some(jars) =>
// Break the list of jars to upload, and resolve globs.
// 将spark.yarn.jars中配置的jar(非本地jar)进行分发
val localJars = new ArrayBuffer[String]()
jars.foreach { jar =>
if (!isLocalUri(jar)) {
// 得到jar的路径
val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
// 得到文件系统
val pathFs = FileSystem.get(path.toUri(), hadoopConf)
pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
val uri = entry.getPath().toUri()
statCache.update(uri, entry)
// 分发jar
distribute(uri.toString(), targetDir = Some(LOCALIZED_LIB_DIR))
}
} else {
localJars += jar
}
}
sparkConf.set(SPARK_JARS, localJars)
case None =>
// No configuration, so fall back to uploading local jar files.
// 没有配置 spark.yarn.archive和spark.yarn.jars 需要上传本地的jar包
logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " +
"to uploading libraries under SPARK_HOME.")
// 在Spark home目录下查找存放jar的目录
val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
sparkConf.getenv("SPARK_HOME")))
// 创建一个名为 __spark_libs__.zip的
val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
new File(Utils.getLocalDir(sparkConf)))
// 利用 __spark_libs__.zip创建输出流
val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))
// 将jar目录下的所有jar文件,添加到__spark_libs__.zip的输出流中,这里其实就是将 jar打包成一个 __spark_libs__.zip的文件
try {
jarsStream.setLevel(0)
jarsDir.listFiles().foreach { f =>
if (f.isFile && f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
jarsStream.putNextEntry(new ZipEntry(f.getName))
Files.copy(f, jarsStream)
jarsStream.closeEntry()
}
}
} finally {
jarsStream.close()
}
// 将打包好的__spark_libs__.zip进行分发, 分发到__spark_libs__目录下
distribute(jarsArchive.toURI.getPath,
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_LIB_DIR))
jarsArchive.delete()
}
}

看到代码的实现,这里还有一个问题,对于配置spark.yarn.jars的情况,如果其中的jar是以“local://”开头的,会将这些文件加入到列表中,然后用这个列表来重新设置spark.yarn.jars的配置,那么后续还会怎么处理呢?也就是说对于以local://开头配置的文件,要怎么处理呢?

分发用户Jar包

对于用户的jar包(通过–jar参数指定的),会将其以“app.jar”进行分发

1
2
3
4
5
6
7
8
9
Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar =>
val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME))
if (isLocal) {
require(localizedPath != null, s"Path $jar already distributed")
// If the resource is intended for local use only, handle this downstream
// by setting the appropriate property
sparkConf.set(APP_JAR, localizedPath)
}
}

其他需要分发的

除了上面那些分发的内容,用户还可以设置spark.yarn.dist.jars、spark.yarn.dist.files或spark.yarn.dist.archives配置项来制定自己要分发的文件,对于spark.yarn.dist.jars指定的jar,会被添加到classPath中,另外两个不会。需要注意的是这些文件不能和之前上传的URI或文件名相同,否则不会分发。然后这些值会作为配置项“spark.yarn.secondary.jars”的信息进行设置。对于那些AM不需要,但是executor需要的jar,可以通过这种方式来配置。

配置项的上传

除了上面的jar包、文件的分发,系统还会上传,配置项会被上传到(spark.yarn.stagingDir|~)/.sparkStaging/application_id/spark_conf.zip位置。
上传的内容配置内容如下,“spark.overlay.hadoop.conf.filenames”配置项中指定的配置文件、log4j.properties、metrics.properties、环境变量“HADOOP_CONF_DIR”目录中的文件、环境变量“YARN_CONF_DIR”目录中的文件。其中如果有相同的配置文件,那么最后两个环境变量中的文件优先级最低。除了这些文件,系统还会将内存中sparkConf的属性和Keytab文件中的属性,以“spark_conf.properties”进行保存,一起添加到压缩文件spark_conf.zip中,一起上传到spark.yarn.stagingDir|~)/.sparkStaging/application_id/spark_conf.zip位置。

这些资源如何传递给container呢?答案是生成ContainerLaunchContext时(Records.newRecord(classOf[ContainerLaunchContext])–这样创建),作为localResources属性设置的。除了localResource,还有一个environment需要设置,环境信息和资源信息一样,也会上传到“(spark.yarn.stagingDir|~)/.sparkStaging/application_id/”目录下。

启动命令的拼接

启动命令的拼接就是为了要拼成 /bin/java -server ${-javaOpts} ${amArg}这样的命令来启动另外一个java类。对于要启动哪个类,如下判断:

对于集群模式将会启动org.apache.spark.deploy.yarn.ApplicationMaster对象,赋予非集群模式将启动org.apache.spark.deploy.yarn.ExecutorLauncher对象。

对于集群模式,启动ApplicationManster,这样就与之前ApplicationMaster的运行流程连接起来。

创建Application提交上下文

创建Container提交上下文是通过createApplicationSubmissionContext方法实现的。与其说是创建不如说设置,因为这个上下文是通过newApp.getApplicationSubmissionContext得到的,而newApp是通过yarn客户端调用createApplication得到的。
这个方法对Context设置的信息如下

context属性 取值
applicationName spark.app.name配置项,默认为Spark
queue spark.yarn.queue配置项
AMContainerSpec 上面生成的ContainerLaunchContext
applcationType “SPARK”
applicationTags spark.yarn.tags配置项,如果是多值,逐个设置
maxAppAttempts spark.yarn.maxAppAttempts配置项
attemptFailuresValidityInterval spark.yarn.am.attemptFailuresValidityInterval配置项
AMContainerResourceRequest 新生成的ResourceRequest对象,ResourceRequest对象包含的信息,如下表,该配置只有在spark.yarn.am.nodeLabelExpression设置时有效
resource 新生成的Resource对象,包括所需的内存和CPU,该配置只有在spark.yarn.am.nodeLabelExpression没有设置时有效
logAggregationContext 新生成的LogAggregationContext,只有在spark.yarn.rolledLog.includePattern设置时有效

ResourceRequest包含的信息如下
|属性|取值|
|-|-|
|resourceName|*,等|
|priority|Priority对象,默认为0|
|capability|Resource对象,包含内存和cpu|
|numContainers|所需container的数量|
|nodeLableExpression|节点标签表达式 —- 这个在申请资源时有什么作用|

将上面补充好的ApplicationSubmissionContext对象作为参数,通过yarn客户端的submitApplication方法,就完成了application的提交。

得到Application提交上下文之后,便可以调用yarnClient来提交Application。

spark-2-11-metircs

发表于 2019-01-31   |   分类于 spark 2.11

本文按照Metric的基本认识、Spark对Metrics system的配置、源码分析MetricsSystem的顺序进行学习

Metrics

目前最流行的metrics库是dropwizard/metircs,spark使用的也是这个库。下面我们介绍一下dropwizard/metircs的概念和用法。

Maven依赖

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version>
</dependency>
</dependencies>

Metric的基本使用

MetricRegistry类是Metrics的核心,他是存放应用中所有metrics的容器,也是我们使用Metrics的第一步:

1
MetricRegistry metricRegistry = new MetricsRegistry();

每个Metrics都有一个唯一的名字,我们可以通过MetricRegistry.name来生成:

1
metricName = MetricRegistry.name("name", "namesecond", "namethrid"); //生成name.namescond.namethrid

有了MetricRegistry和Metric之后,接下来需要进行注册

1
metricRegistry.register(metricName, MetricType)

注册的时候,需要指定Metric的类型,详细的类型,后面会介绍。
以下是Spark中的一些使用参考:

1
2
3
4
5
override val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
override def getValue: Int = threadPool.getActiveCount()
})

Metrics类型

Metrics有五种类型,分别是Gauges、Counters、Meters、Histograms和Timers。

Gauges

Gauges是最简单的Metrics类型,该对象中只有一个方法getValue用于返回统计的值,下面是Gauges接口的定义:

1
2
3
public interface Gauge<T> extends Metric {
T getValue();
}

从方法可以看出,Gauge中的getValue可以返回与Gauge定义类型相同的任意类型。
以下是Spark中Gauges类型Metric的定义:

1
2
3
4
5
6
7
8
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
override def getValue: Int = threadPool.getActiveCount()
})
// Gauge for executor thread pool's approximate total number of tasks that have been completed
metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
override def getValue: Long = threadPool.getCompletedTaskCount()
})

分别定义了一个Int类型和一个Long类型的Gauges。

Counters

Counter是计数器,既然是计数器,因此可增可减。Counter其实是对AtomicLong的封装。

Meters支持的方法

inc(): 计数器自加。
dec(): 计数器自减。

生成方法

1
pendingJobs = registry.counter(MetricRegistry.name(Queue.class,"pending-jobs","size"));

Meters

Meters用来计算事件发生的速率。Meters会统计近1分钟、5分钟、15分钟以及全部时间的速率。

Meters可用的方法

mark()方法:表示发生一次事件。

生成方法

1
Meter meterTps = registry.meter(MetricRegistry.name(MeterTest.class,"request","tps"));

Histograms

Histograms统计数据的分布情况。比如最大值、最小值、中间值、中位数、75百分位、90百分位、98百分位、99百分位和99.9百分位的值。

Histograms可用的方法

histogram.update(…): 更新一个数

生成方法

1
2
Histogram histogram - new Histogram(...)
registery.register(MetricRegistry.name(...), histogram)

Timers

Timer可以理解为Histogram和Meter的结合。

Timer可用的方法

timer.time(): 记录时间

生成方法

1
Timer timer = registry.timer(MetricRegistry.name(...))

Metrics的输出

通过JMX

1
2
final JmxReporter reporter = jmxReporter.forRegistry(registry).build()
reporter.start()

一旦reporter被启动,所有在registry中的metrics可以通过JConsole或VisualVM(如果安装了MBeans插件)可见。

其他方式

除了JMX,Metrics还能够以HTTP、STDOUT、CSV、SLFJ、Ganglia和Graphite的方式输出。 详细可以参考:https://metrics.dropwizard.io/3.1.0/getting-started 和 https://metrics.dropwizard.io/3.1.0/manual/core/

Spark中Metrics的配置

Spark的Metrics的配置,在$SPARK_HOME/conf/metrics.properties文件中配置。如果用户需要制定自己的配置文件,可以通过spark.metrics.conf来指定。默认情况下,driver或executor的root命名空间为spark.app.id,但是在某些情况下,用户想要跨driver或executor跟踪Metrics,对于这种情况,可以使用spark.metrics.namespace来自定义命名空间。

实例

在Spark中,Metrics根据Spark的组建被划分中不同的实例。对于每个实例,可以配置一组sinks来报告metrics。如下是当前支持的实例:

实例名 解释
master standalone模式Spark的master进程
application master中的组件,对不同applications进行报告
worker standalone模式Spark的worker进程
executor 一个Spark executor
driver Spark的driver进程
shuffleService Spark的Shuffle服务
applicationMaster 当在Yarn上运行时,Spark的application master

Sinks

每个实例能够汇报给0个或多个sink。Sinks包含在org.apache.spark.metrics.sink包中,有如下sink:

sink 解释
ConsoleSink 记录metrics信息到 console
CSVSink 以一定的间隔,将metrics数据导出到CSV文件
JmxSink 将metrics注册到JMX console
MetricsServlet 在已有的Spark UI中添加一个Servlet,以JONS数据格式来服务metrics数据
GraphiteSink 将metrics发送给Graphite节点
Slf4jSink 将metrics发送给slf4j
StatsdSink 将metrics发送给StatsD节点
GangliaSink 将metrics发送给Ganglia节点,需要重新编译Spark,将gangliaSink打包进去,默认不包含

metrics.properties

可以参考:$SPARK_HOME/conf/metrics.properties.template文件,在这个文件中对source、sink等作了详细的解释,并提供了例子。

从Spark源码看Metrics的使用

Spark中对于Metrics的处理集中在core模块下的org.apache.spark.metrics包中。
包结构如下:
image.png
其中MetricsSystem是该模块的入口,MetricsConfig为模块的配置管理,Source是Metric系统的数据源,Sink是Metrics的量输出。

MetricsConfig

MetricsConfig用来加载metrics的配置,并对MetricsSystem进行相关的配置。在MetricsSystem类中,会生成MetricsConfig对象,并进行初始化操作。

首先看看MetricsConfig的一些属性

正如Spark文档说的,默认的Metrics配置文件为 metrics.properties

initialize方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def initialize() {
// 添加默认属性,用于没有配置文件的情况
setDefaultProperties(properties)
// 从metrics配置文件中加载metrics配置
loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
// 从Spark配置文件中加载metrics配置
val prefix = "spark.metrics.conf."
conf.getAll.foreach {
case (k, v) if k.startsWith(prefix) =>
properties.setProperty(k.substring(prefix.length()), v)
case _ =>
}
perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
// 判断是否以 * 开始的实例配置,这种表示是所有实例的配置(称为默认实例配置)
if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
// 获取默认实例配置
val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
// 循环所有非默认实例配置,将实例配置添加到非实例配置中(注意,实例配置已有的属性不会被覆盖)
for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
(k, v) <- defaultSubProperties if (prop.get(k) == null)) {
prop.put(k, v)
}
}
}

在initialize方法中,首先会调用setDefaultProperties方法来加载默认配置,这是为了应对没有设置Metrics配置文件的情况(连默认的metrics.properties都没有提供)。
然后会调用loadPropertiesFromFile方法,从指定的配置文件中加载Metrics配置。
再然后,会读取Spark的所有配置项,从中筛选以”spark.metrics.conf.”为前缀的配置项,并将配置项添加到属性中。需要注意的是,在添加属性的时候,并非以配置项的全名作为属性,而是以子名作为属性名,例如:spark.metrics.conf.xxx,那么会使用xxx作为属性名,而spark.metrics.conf.xxx.x1则会使用xxx.x1作为属性名。
最后,该方法还会对实例属性(指定了实例的配置)进行整理。实例属性的配置类似driver.path这样的,driver就是实例,因此这里就有一个问题,对于所有实例通用的配置应该怎么设置呢?答案是.path这样的。有的则表示通用的默认值,没有的配置,第一个dot(.)前的为实例。有实例的配置会覆盖没有实例的配置。代码中的属性private val INSTANCE_REGEX = “^(\|[a-zA-Z]+)\.(.+)”.r,就解释了实例配置的模式,要不就是以“”开头,要么就以字母开头,以字母开头的表示具体实例,以“”开头的表示所有实例通用。
因此也就可以理解setDefaultProperties方法中设置默认值的作用。整理的作用就是将默认实例配置(以“*”开头的实例配置)添加到非默认实例配置中(不会覆盖非默认实例配置已有的属性)。
比如,有如下属性

1
{"\\*.class"->"default_class", "\\*.path"->"default_path, "driver.path"->"driver_path"}

对于driver实例,他最后得到的属性为

1
{{"driver"->Map("path"->"driver_path", "class"->"default_class"}}

setDefaultProperties

当没有配置metrics配置文件时(是指连默认的配置文件都不存在),采取的默认Metrics配置。

1
2
3
4
5
6
7
8
private def setDefaultProperties(prop: Properties) {
// 默认开启了servlet类型的sink,指定servlet的class为org.apache.spark.metrics.sink.MetricsServlet
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
// 先为所有实例设置,再为master和applications类型的实例设置
prop.setProperty("*.sink.servlet.path", "/metrics/json")
prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}

loadPropertiesFromFile

方法需要path参数(配置文件的名字-从Spark的配置项spark.metrics.conf中读取),如果给定的path存在,则从这个文件中加载,如果path不存在,则从classPath目录中加载默认配置文件metrics.properties。

MetricsSystem

MetricsSystem类是对外提供的一个对象,从调用来看,在Master、SparkEnv和Work中都有创建MetricsSystem的代码。其中Master和Work是对于standalone模式下创建的。SparkEnv中是对driver和executor进行创建的。生成MetricsSystem的时候是需要指定instance参数的。因此从创建MetricsSystem的代码我们也能够知道MetricsSystem都支持哪些instance。
TODO 对于applications的创建,有时间再找
这里,我们先抛开对MetricsSystem的操作,我们先看看MetricsSystem的实现

创建MetricsSystem

从所有的生成MetricsSyste的代码来看,MetricsSystem都是通过如下代码创建的:

1
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr) // 这里创建的一个work实例的MetricsSystem

而对于createMetricsSystem方法的定义也很多简单,就是new了一个MetricsSystem对象:

1
2
3
4
def createMetricsSystem(
instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr)
}

注册Source

创建了MetricsSystem,接下来就需要向它注册source。我们以Worker中MetricsSystem的使用来作为参考。

1
2
3
metricsSystem.registerSource(workerSource)
metricsSystem.start()
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)

因此,我们来看一下MetricsSystem的registerSource方法

1
2
3
4
5
6
7
8
9
def registerSource(source: Source) {
sources += source
try {
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}

将source添加Source列表中,然后调用buildRegistryName方法生成source注册使用的名字,然后进行注册,那么需要看一下buildRegistryName方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private[spark] def buildRegistryName(source: Source): String = {
val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)
if (instance == "driver" || instance == "executor") {
if (metricsNamespace.isDefined && executorId.isDefined) {
MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
} else {
// Only Driver and Executor set spark.app.id and spark.executor.id.
// Other instance types, e.g. Master and Worker, are not related to a specific application.
if (metricsNamespace.isEmpty) {
logWarning(s"Using default name $defaultName for source because neither " +
s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
}
if (executorId.isEmpty) {
logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
s"not set.")
}
defaultName
}
} else { defaultName }
}

buildRegistryName方法,就是根据Source,构建一个向MetricRegistry注册的名字。这个名字是唯一的。其基本思路就是根据namespace(从spark.metrics.namespace或spark.app.id中取)和executorId(从spark.executor.id中取)构建一个注册用的名字,但是需要注意的是,是否使用这两个值作为构建的条件,要依赖实例是否是drvier或executor,否则一律使用source的名字生成。
注册好Source,调用MetricsSystem的start方法启动MetricsSystem。

启动start方法

1
2
3
4
5
6
7
8
def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
StaticSources.allSources.foreach(registerSource)
registerSources()
registerSinks()
sinks.foreach(_.start)
}

start方法代码很少,但是包含的东西却挺多。注册静态Source,所谓的静态Source,其实就是不依赖用户配置的Source。接着调用registerSources方法,这个注册是读取配置的文件中当前实例(instance)的配置,对Source的class进行实例化并注册。然后是调用registerSinks方法来注册Sink,也是将配置文件中当前实例(instance)的配置进行操作,对Sink的class进行实例化,并加入到Sink集合中等待调用(调用sink的report方法);最后启动所有的sink。

Source

Source就是Metrics的数据源,我们对Source中的统计数据进行操作,Metrics系统会从已经注册的Source中获取这些数据,然后由Sink报告出去。下面我们以WorkSource来进行分析,看看如何自定义一个Source。

WorkSource

先看一下WorkSource的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private[worker] class WorkerSource(val worker: Worker) extends Source {
override val sourceName = "worker"
override val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
})
// Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})
...
}

WorkSource中metric的类型只涉及到了Gauge,也就是这些值不需要用户参与。对于需要用户参与的我们可以参考StaticSources,稍后也会看到。WorkSource的逻辑很简单,就是注册了一些metric,并制定了这些metric如何取值。

StaticSources$HiveCatalogMetrics

接下来看一下StaticSources$HiveCatalogMetrics的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object HiveCatalogMetrics extends Source {
override val sourceName: String = "HiveExternalCatalog"
override val metricRegistry: MetricRegistry = new MetricRegistry()
val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
...
def reset(): Unit = {
METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
...
}
// clients can use these to avoid classloader issues with the codahale classes
def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
...
}

这个类中metric的类型为Counter,reset方法、incrementFetchedPartitions方法和incrementFilesDiscovered方法就是对这些Counter进行操作。

Sink

Spark中已经内置了多种Sink,有ConsoleSink、CsvSink、JmxSink、MetricsServlet等,基本可以满足我们的需求了。接下来我们分析一两个常用Sink的实现。

JmxSink

JmxSink的定义如下:

1
2
3
4
5
6
7
8
9
10
11
private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
override def start() {
reporter.start()
}
override def stop() {
reporter.stop()
}
override def report() { }
}

JmxSink继承Sink特征,因此它需要实现start()、stop()和report()方法。另外在JmxSink定义中创建了一个JmxReporter对象,创建reporter对象时需要MetricRegistry对象,这就将Source和Sink中的Report联系了起来(metric注册到Source的MetricRegistry,source注册到MetricsSystem的MetricRegistry,在生成Sink中Reporter的时候会使用MetrcsSystem中的MetricRegistry)。那么Sink是如何报告自己收集的metrics呢?我理解的是,内部的Reporter会自己报告,也可以调用Sink的report方法,report方法再调用reporter的report方法。(—-TODO,Reporter的汇报,会稍后补充)

ConsoleSink

ConsoleSink的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CONSOLE_DEFAULT_PERIOD
}
val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
override def report() {
reporter.report()
}
}

ConsoleSink中也实现了三个必须的方法,除此之外还定义了一个ConsoleReporter。而且它实现的三个方法其实也是对ConsoleReporter进行操作的。查看其他的Sink,也能够发现,每个Sink都有一个Reporter(除了MetricsServlet外,它的逻辑与普通Sink不同)。

Source和Sink的操作

Source的操作

对于Source的操作,我们就参考StaticSources$HiveCatalogMetrics中相关metric的操作吧。
如下是HiveClientImpl的一个方法,他就在操作Source的METRIC_PARTITIONS_FETCHED。

1
2
3
4
5
6
7
8
9
10
11
12
13
override def getPartitions(
table: CatalogTable,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
val parts = spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
case Some(s) =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
}
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}

Sink的操作

对于Sink的调用,是在MetricsSystem的report方法中触发的:

1
2
3
def report() {
sinks.foreach(_.report())
}

对于MetricsSystem中report方法的调用,会在各个组建停止的时候调用,进行强制汇报。

Source、Sink、MetricsSystem以及MetricsConfig之间的关系

image.png

自定义Source

要定义自己的Source对象,需要实现org.apache.spark.metrics.source.Source特征,并实现sourceName(String类型)和metricRegistry(MetricRegistry类型)方法。之后定义自己的metric类型即可,可用的metric类型已经在上面的第一部分进行介绍。如下是DAGScheduulerSource的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private[scheduler] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "DAGScheduler"
metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failedStages.size
})
metricRegistry.register(MetricRegistry.name("stage", "runningStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.runningStages.size
})
metricRegistry.register(MetricRegistry.name("stage", "waitingStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waitingStages.size
})
metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.numTotalJobs
})
metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
/** Timer that tracks the time to process messages in the DAGScheduler's event loop */
val messageProcessingTimer: Timer =
metricRegistry.timer(MetricRegistry.name("messageProcessingTime"))
}

在这个类的定义中,实现了Source特征,并重写了metricRegistry和sourceName方法。接下来注册了自己要统计的metric,默认是以servlet中展示这些信息,我们可以通过www.xxxxx:8088/proxy/application_id/metrics/json来查看对应的输出:

1
2
3
...
"application_1530945921317_6019078.driver.DAGScheduler.job.activeJobs":{"value":2},"application_1530945921317_6019078.driver.DAGScheduler.job.allJobs":{"value":77},"application_1530945921317_6019078.driver.DAGScheduler.stage.failedStages":{"value":0},"application_1530945921317_6019078.driver.DAGScheduler.stage.runningStages":{"value":2},"application_1530945921317_6019078.driver.DAGScheduler.stage.waitingStages":{"value":0},
...

自定义的类截图
image.png
该类会在DAGScheduler中进行实例化:
image.png
然后在SparkContext中进行注册:
image.png
以下是自定义的source注册后,在servlet中的显示
image.png

问题

如果在配置文件中配置自己的source呢?关键是如何定义。。。需要看一下

对于配置文件中metrics的配置格式如下:

[instance].[sink|source].[name].[options] = xxxx
[instance]可以是master、worker、executor、driver或application,也就是明确了实例的类型。如果让所有实例都可以使用,可以使用“*”代替。
[sink|source]指定配置项是sink的信息还是source的信息,只能二选一。
[name]指定sink或source的名字。
[options]指定sink或source的相关属性,如class。

注意点:如果是需要进行配置的自定义source,有两点需要注意。

配置中必须通过class属性指定类名,程序会根据类名进行反射。
配置的自定义Source必须有不含构造参数的构造方法,否则无法实例化。

配置自定义Source

编辑 metrics.properties配置文件
增加如下配置即可

1
driver.source.rwmTest.class=org.apache.spark.metrics.source.RWMTestSource

UI中的显示信息
image.png

配置项

Spark config

配置项 默认值 意义
spark.metrics.conf metrics.properties Metrics系统的配置文件
spark.metrics.conf.xxx 无 将xxx作为属性,添加到属性中,可以将其理解为 metrics.properties中配置的一种转移,会和metrics.properties中的配置项放在一起,而且此配置优先级较高
spark.metrics.namespace 无 metrics的命名空间,参与构建Metrics source注册名的生成
spark.app.id 无 如果spark.metrics.namespace没有指定值,则使用该值作为namespace,参与构建Metrics source注册名的生成
spark.executor.id 无 executor的id,参数构建Metrics source注册名的生成

Metrics config

配置项 默认值 意义
source.(.+).(.+) 无 source的配置信息,就是以source.xxx.xxx形式的,只能有两个dot(.)
sink.(.+).(.+) 无 sink的配置信息,就是以sink.xxx.xxx形式的,只能有两个dot(.)

对于source和sink的配置,参考:$SPARK_HOME/conf/metrics.properties.template

Yarn RestApi

发表于 2019-01-23   |   分类于 yarn

本文记录自己在工作学习期间使用Yarn rest api的一些记录

Yarn rest api 介绍

获取所有的application id

Url的地址

http:///ws/v1/cluster/apps
参考地址:https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Applications_API

参数

可以提供的参数

参数名 参数意义
states 限制application的状态
finalStatus 限制最终状态,如:UNDEFINED
user 用户名
queue 所使用的队列
limit 限制返回的数量
startedTimeBegin 与startedTimeEnd配合限制application的start的时间
startedTimeEnd 与startedTimeBegin配合限制application的start的时间
finishedTimeBegin 与finishedTimeEnd配合限制application的完成时间
finishedTimeEnd 与finishedTimeBegin配合限制application的完成时间
applicationTypes 限制application的类型,如SPARK
applicationTags 不知道
deSelects 不知道

Python实现

需要导入requests

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import requests
def queryApplications(host, port, p):
url = 'http://%s:%s/ws/v1/cluster/apps' % (host, port)
reponse = requests.get(url, params=p)
#print reponse.url
apps = json.loads(reponse.text)
if apps:
return apps.get("apps", {})
else:
return None
if "__main__" == __name__:
host = 'yarn.host.com'
port = 8088
params = {}
params['startedTimeBegin'] = changeTimeToNumber(changeStrToTime(getHourBegin()))
params['startedTimeEnd'] = changeTimeToNumber(changeStrToTime(getHourEnd())) * 1000
params['applicationTypes'] = ['SPARK']
apps = queryApplications(host, port, params)

返回值

返回在此Yarn调度过的applicaiton列表

获取单个Application的信息

Url的地址

http:///ws/v1/cluster/apps/{appid}
参考地址:https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Application_API

参数

没有参数

Python实现

1
2
3
4
5
6
7
8
9
10
11
12
13
import requests
def queryApplication(appid, host, port):
url = 'http://%s:%s/ws/v1/cluster/apps/%s' % (host, port, appid)
reponse = requests.get(url)
#print reponse.url
res = json.loads(reponse.text)
return res
if "__main__" == __name__:
host = 'yarn.host.com'
port = 8088
app = queryApplication('application_1476912658570_0002', host, port)

返回值

返回application的详细信息,可以参考URL处提供的地址

spark-2.11-thriftServer

发表于 2019-01-18

本文记录Thrift server的学习,先从一个服务异常为切入点:

Thrift Server的本地启动

– 感谢韦大侠帮助
直接运行 HiveThriftServer2是无法正常启动的,首先需要配置启动参数:
pimg_5c1771760f7a5.png
在 Program arguments 中添加:-hiveconf spark.master=local[2] -hiveconf spark.driver.bindAddress=127.0.0.1
在Working directory中添加:/Users/renweiming/spark/spark-2.x
这样启动的话还是会抛出异常信息:
pimg_5c1771cf3f9a6.png
解决缺少类的方法是 File -> Project Structure…
pimg_5c177201e0a2a.png
打开项目结构对话框
pimg_5c177250538bb.png
点击左下角的“➕”来增加Jar。一个小建议,如果你的Spark项目已经构建过了,那么其实Spark所需的jar都已经存在于本地的maven仓库里了,直接去仓库中,将对应的jar添加即可,就不再去下载jar包了。
常见的一些所需jar包:
| 异常 | 所需jar | 说明 |
| ———— | ————- | —————- |
| java.lang.ClassNotFoundException: com.google.common.cache.CacheLoader | com.google.guava | 该问题,一般是因为Scope设置的是Provided,修改为Compile即可,见表格下的截图 |
| 我擦,一下子启动起来了,以前配置过,以后遇到再补充吧,不给表现机会。。| …| …|
pimg_5c1774288b08d.png

带着问题学习

在Thrift server中,出现了如下异常:
18/12/17 15:01:00 INFO AbstractService: Service:HiveServer2 is stopped.
通过向上追日志,发现还有如下信息:
18/12/17 15:00:59 INFO HiveServer2: Shutting down HiveServer2
继续向上追:
18/12/17 15:00:59 ERROR SparkExecuteStatementOperation: Error running hive query as user : livepm
继续向上追:

从代码结构来看:
HiveServer2继承CompositeService,继承AbstractService。上面的异常就是从 AbstractService的stop()方法中报出来的。而这个方法调用是从如下的流程中调用的:HiveServer2.stop() -> CompositeService.stop() -> AbstractService.stop()。
所以,从代码上来分析,肯定是什么地方调用了stop方法。
在HiveServer的init方法中,添加了程序关闭的钩子函数:
pimg_5c1778027340a.png
所以肯定是主动调用了stop方法,或者是异常导致HiveServer关闭了。

开始看到什么记什么,稍后在整理
Thrift server的UI其实是在Spark的UI上增加了一个tab页,默认端口为4040,可以通过 spark.ui.port 进行自定义。Thrift Server的展示效果,是通过类 ThriftServerPage 和 ThriftServerSessionPage 来定义的。

在初始化HiveThriftServer的时候,会根据Hive的配置项hive.server2.transport.mode的值(可选的有http和binary)来决定生成哪种 ThriftCLIService。如果是http则生成 ThriftHttpCLIService,否则生成 ThriftBinaryCLIService。

启动流程

入口必然是HiveThriftServer2,HiveThriftServer2的main方法,在方法的开头就生成了一个HiveServer2对象,并调用它的parse方法得到一个ServerOptionsProcessorResponse对象,该对象中包含的ServerOptionsExecutor是 StartOptionExecutor对象。 – 但是这个服务好像就没有被调用,也就是说 HiveServer2并没有被调用。

接下来
创建了一个HiveThriftServer2对象,并调用了这个服务的init和start。
pimg_5c18ae94ed28f.png
然而start方法只是调用了父类的start,并将自身的启动标示started设置为true。父类(HiveServer2)的start方法,同样又调用了父类(CompositeService)的start方法。在这个类的方法中,它将serviceList中的service(通过addService添加的)分别调用start方法,然后又调用了父类(AbstractService)的start方法。在这个顶级抽象方法中,设置了服务的开始时间、检查了当前状态为INITED,并将服务的状态设置为 STARTED。
所以我们的关注点就落在了 serviceList中service是如何添加进去的。我们返回看一下HiveThriftServer2的init方法,就是上图中我们调用的init方法。
pimg_5c18aee6c6ac3.png
这样就添加了两个service,分别是 SparkSQLCLIService 和 ThriftCLIService。针对THriftCLIService,会根据 hive.server2.transport.mode 的配置项(binary 或 http)(我们公司使用的是默认值“binary”,也就是用的 ThriftBinaryCLIService )具体生成ThriftHttpCLIService或ThriftBinaryCLIService。
继续将此方法看完,在init的最后执行的是 initCompositeService(hiveConf),此方法是ReflectedCompositeService定义的一个方法,具体如下:
pimg_5c18b128e9a91.png
这个方法,要把它作为HiveThriftServer2的一部分来读,那么它的功能就是初始化父类ServiceList中的service:

得到父类(CompositeService)的serviceList,并对里面的service进行初始化(执行init方法)
设置hiveConf信息,通过反射设置。
检查服务的当前状态(确定必须是NOTINITED状态)并设置新的状态(INITED状态)。 – 为啥不直接调用AbstractService的init方法呢?

继续按着顺序来吧,看一看前面通过addService添加到serviceList中的每个service吧。

首先看看 SparkSQLCLIService 的init

为父类设置HiveConf
生成SparkSQLSessionManager,并将SparkSQLSessionManager通过addService添加到CompositeService的serviceList中。注意这里的CompositeService是SparkSQLCLIService自己的CompositeService。之后又调用了 initCompositeService。相当于将添加进去的service进行初始化。

SparkSQLSessionManager的初始化

首先,将hiveConf设置给父类(SparkSQLSessionManager的父类 SessionManager )。
然后,判断是否开启了 hive.server2.logging.operation.enabled 配置(默认为true),如果开启了,则执行initOperationLogRootDir来初始化操作日志目录。
然后,根据配置项 hive.server2.async.exec.threads 的值,生成一个固定线程数的线程池,并将线程池设置给父类的 backgroundOperationPool。
最后,生成一个 SparkSQLOperationManager 对象,并将此对象 设置给父类(SessionManager)的 operationManager。然后对 operationManager 进行初始化。

SparkSQLOperationManager的初始化

根据配置 hive.server2.logging.operation.enabled 的值(默认为true),来确定是否要启动一个 hive.server2.logging.operation.level 配置级别的Appender。

再看看 ThriftBinaryCLIService

首先,根据系统环境或配置项 hive.server2.thrift.bind.host 中得到需要绑定的host,如果没有则绑定本地地址。
然后,根据配置项 hive.server2.thrift.worker.keepalive.time 获取worker保持活跃的时长(默认为60秒)。
继续然后,从系统变量 HIVE_SERVER2_THRIFT_PORT 或配置项 hive.server2.thrift.port 中获取服务绑定的端口号,默认为10000。
最后,从配置项 hive.server2.thrift.min.worker.threads 和 hive.server2.thrift.max.worker.threads 中分别读取worker的最小线程数和最大线程数。
调用父类(AbastractService)的init方法。
worker活跃时长、最大线程数和最小线程数,用来初始化Service内部的线程池。

综上来看,其实就是一层一层的进行初始化,但是为啥不直接调用初始化方法呢?个人推测是覆盖性操作。

继续回到 HiveThriftServer2 初始化的位置,接下来就是启动 HiveThriftServer2,调用start方法,调用链如下:
HiveThriftServer.start() -> HiveServer2.start() -> CompositeService.start()(–此方法中会将serviceList中的所有service进行启动)
所以,接下来我们对 SparkSQLCLIService 和 ThriftBinaryCLIService 的start方法进行分析。

SparkSQLCLIService的启动

SparkSQLCLIService类中并没有定义start方法,但是它的父类CLIService中定义了。父类的启动很简单,首先调用父类(CompositeService)的start方法,相当于对serviceList中的各个service进行启动。然后自己内部生成一个 HiveMetaStoreClient,并链接获取默认的数据库(只是为了测试链接)。 至于serviceList中service的启动,需要结合上面的初始化来看看需要具体启动那些service。稍后上图。

ThriftBinaryCLIService的启动

ThriftBinaryCLIService自身也没有定义start方法,其父类(ThriftCLISService)定义了start方法。具体逻辑如下:调用父类(AbstractService)的start方法。然后,将自己作为参数生成一个Thread,并进行启动(因为ThriftCLIService实现了Runnable接口)。然而,ThriftCLIService自身只是定义了抽象方法run,具体实现由 ThriftBinaryCLIService 或 ThrfitHttpCLIService 来实现。因为我们使用的是 ThriftBinaryCLIService ,因此我们只对此类进行分析。
对于ThriftBinaryCLIService的启动,我们进行分析:
创建线程池、创建TTransportFactory、创建TProcessorFactory、创建serverSocket,这些是用来创建TThreadPoolServer.Args,然后使用它来创建 TThreadPoolServer,然后调用service的serve方法来启动服务。这里就会输出 Starting … on port … with … … worker threads的日志。

Spark 2.11 shuffle data read and write

发表于 2019-01-17   |   分类于 spark 2.11

本文记录对Spark Shuffle过程中对数据读写的梳理。本文将先从数据的读入作为切入点,也就是从ShuffleReader的源码作为切入,然后在逐步的展开。因此本文将从读取和写入两个角度来进行分析,最后在用流程图的方式将读取和写入整合起来。

数据读取

ShuffleReader的实现类

ShuffleRead是定义在org.apache.spark.shuffle包中的一个接口(理解为接口吧,其实是trait),它只定义了一个方法read(),并且这个方法返回一个Iterator(迭代器)。对于ShuffleReader的实现类,只有一个类 BlockStoreShuffleReader,与ShuffleReader定义在同一个包中。

ShuffleReader的read()方法的实现

read()基本的流程图参考如下如下
image.png

在read()方法中,首先生成一个ShuffleBlockFetcherIterator对象。该对象的详细介绍会在后面进行。
生成的ShuffleBlockFetcherIterator本身就是一个Iterator,但是需要注意这个Iterator中包含的数据都是进行了加密和压缩的,我们在之后分析ShuffleBlockFetcherIterator的时候会了解到。
接着,我们会得到序列化器的实例,将ShuffleBlockFectherIterator中的数据进行解析。
如果Shuffle的依赖中定义了聚合器,则需要进行一次聚合操作(将上面的迭代器中的数据再次处理):如果是map端聚合,则使用聚合器的 combineCombinersByKey来处理,否则使用combineValuesByKey来处理。
最后,创建一个ExternalSorter并将上一步的数据全部插入到这个sorter中,并使用这个ExternalSorter创建一个CompletionIterator返回。
大家在这里会看到聚合器和ExternalSorter,这就实现了Shuffle的基本操作。

ShuffleBlockFetcherIterator的实现与流程

生成这个对象需要一系列参数:

参数名 参数类型 参数的意义和作用
context TaskContext Task的上下文
shuffleClient ShuffleClient 读取Shuffle数据的Client
blockManager BlockManager Block的管理器,可以获取Block的相关信息
blocksByAddress Seq 根据BlockManagerId聚合的Block列表
streamWrapper 函数 用于将BlockId和对应的InputStream进行包装的方法
maxBytesInFlight Long ——
maxReqsInFlight Int 同时进行请求发送的最大数
maxBlocksInFlightPerAddress Int 同时拉取某个远程地址的block的请求数
maxReqSizeShuffleToMem Long 单个请求拉取数据的可用缓存,如果超出这个缓存则使用临时文件来拉取数据
detectCorrupt Boolean —

通过以上参数,生成了我们的ShuffleBlockFetcherIterator。ShuffleBlockFetcherIterator实现了Iterator(迭代器)接口,同时也实现了TempShuffleFileManager(临时Shuffle文件管理器)。实现Iterator就意味着它有迭代器的相关方法(next、hasNext);实现TempShuffleFileManager,就意味这它能够创建临时Shuffle文件(createTempShuffleFile)、能够注册临时文件的清理(registerTempShuffleFileToClean)。接下来我们分别按照接口的实现来分析,首先看看比较简单的TempShuffleFileManager:

TempShuffleFileManager的实现

实现这个接口,我们需要实现两个方法 createTempShuffleFile 和 registerTempShuffleFileToClean。

createTempShuffleFile

这个方法的实现很简单,就是调用BlockManager的DiskBlockManager的createTempLocalBlock来创建一个临时本地Block并返回。

registerTempShuffleFileToCliean

这个方法的实现也很简单,就是将文件添加到一个名为 shuffleFileSet 的集合中,等到ShuffleBlockFetcherIterator中cleanup方法被执行的时候,就会将 shuffleFileSet 集合中的文件统一删除。

Iterator的实现

我们知道,ShuffleBlockFetcherIterator 最重要的功能是获取数据,这个过程会涉及到从远程拉取数据。

hasNext

方法实现很简单,就是判断已经处理的block数量是否达到需要拉取的block的数量(numBlocksProcessed < numBlocksToFetch)。

next

next方法的实现就复杂了,而且涉及到很多方法之间的调用,我们首先看看流程图,总体说明一下流程,然后在具体分析细节。
image.png
这个流程图说明了 ShuffleBlockFetcherIterator 内部获取数据时几个方法之间的调用关系,next是对用户的接口,用户通过这个方法获取数据。但是我们都应该想到,shuffle数据是很庞大的,所以肯定不可能一次性都将数据拉取过来缓存,只能根据blockId逐步拉取,fetchUpToMaxBytes就是实现逐步拉取数据的逻辑,内部实现了对数据拉取的控制。当需要拉取数据的时候fetchUpToMaxBytes就会调用send方法,send方法是对sendRequest的一层封装,除了调用sendRequest之外,就是要记录每个远程地址当前正在传输Block的数量。sendRequest就是使用ShuffleClient从远程地址拉取数据的具体实现了。
为了便于理解,我们根据上面流程图的倒序进行分析。

sendRequest

sendRequest是远程拉取数据的实现。从代码上来看,代码主要分成三个部分:记录状态信息(为fetchUpToMaxByte提供逻辑处理的数据)、生成BlockFetchingListender(调用fetchBlocks的时候需要这个监听器作为回调)以及使用shuffleClient拉取数据。
基本的流程图如下:
image.png
对于ShuffleClient的fetchBlocks方法的实现我们稍后会深入分析,但是这里需要意识到的是这个方法拉取的是多个block。

send

send方法就是对sendRequest的包装,除了调用sendRequest方法之外,会记录当前远程地址正在传输Block的个数,也画一个图吧:
image.png
就是调用sendRequest,然后设置缓存的值。

fetchUpToMaxBytes

上面我们也简单的提了一下,这个方法就是读取最大限度的数据(但是要整读,不会读取半个block的数据,要么不读,要读取完成的block)。
这个方法的实现其实也比较简单,基本的思路就是在准备发送请求之前,先验证一下是否可以发送,如果可以则调用send方法,如果不可以,则将请求加入到延迟队列中。所以从上面的话中,大家可以了解到,这个数据会操作两个队列 fetchRequests 和 deferredFetchRequests,从代码来看,程序会先从deferredFetchRequest队列中拿请求处理,如果这个队列中没有请求或请求都不符合发送才会去fetchRequest队列中拿请求,实际处理刚开始的时候deferredFetchRequest必然是空的,只有当处理fetchRequest中的请求不符合发送条件的时候才会加入到deferredFetchRequest中,但是这两个队列的数据结构还是不一样的,fetchRequest存放的是FetchRequest(BlockManagerId, Seq[(BlockId, Long)])对象,也就是存储的是BlockManagerId->Block列表的映射关系,BlockManagerId可以简单的理解为存储Block的远程地址;而deferredFetchRequest,存放的是BlockManagerId -> Queue[FetchRequest]的映射,也就是说在添加到deferredFetchRequest的时候根据BlockManagerId进行整合汇总了。
此方法的流程图如下:
image.png
从图中,我们也看到了在这个方法对每个地址正在拉取block的数量(numBlocksInFlightPerAddress)、正在拉取数据的数据量(bytesInFlight)和正在拉取数据的请求个数(reqsInFlight)的使用。
在进入fetchUpToMaxBytes方法后,首先会遍历deferredFetchRequests集合中的数据,这里需要注意的是,这个数据根据BlockManagerId进行了汇总,key是BlockManagerId,value是一个FetchRequest的Queue。首先调用isRemoteBlockFetchable方法验证队列是否是可拉取的(主要判断当前最大同时数据拉取量和最大同时请求数)。然后使用isRemoteAddressMaxedOut方法验证要发送的请求是否达到了远程地址最大同时拉取数量。如果验证都符合条件,则将FetchRequest交给send方法来进行请求的发送。

next

next方法是Iterator接口的方法,也是用户用来获取数据的方法。
因为我们已经知道了sendRequest方法的BlockFetchingListener对象在收到数据拉取成功或失败后会向results(LinkedBlockingQueue)中写入一个SuccessFetchResult或FailedFetchResult,如果是SuccessFetchResult,则会在其中包含一个ManagerBuffer,也就是我们想要的数据。所以next方法获取数据就是从results中拉取数据,又因为results是阻塞队列,所以当results中没有数据的时候,就会被take方法所阻塞,直到拿到数据进行处理。
如果从results拿到的数据为FailedFetchResult,则会抛出异常信息,导致拉取失败。
如果从results拿到的数据为SuccessFetchResult,那么我们就可以进行处理了,首先,因为这个block已经拉取完毕了,所以相关计数器(如正在拉取数据的数据量bytesInFlight)的计数需要进行更新。
对相关计数器操作完成,就会从ManagerBuffer中获取输入流,然后调用serializerManager.wrapStream方法进行数据流的加密和压缩。接着会根据配置(数据流确实被加密或压缩了 、数据量较小)来决定是否要将输入流转换为ChunkedByteBufferInputStream。
最后,返回blockId以及使用BufferReleasingInputStream包装的输入流(包装后可以带调用ShuffleBlockFetcherIterator中的清理方法来进行清理工作)。
image.png

initialize

上面的介绍的这些方法都是在从远程节点拉取数据。其实ShuffleBlockFetcherIterator也会读取本地的数据,但是在生成ShuffleBlockFetcherIterator的时候会对Block进行拆分。而这一切是从initialize方法开始的。
image.png
方法的逻辑很简单,显示注册一个Task完成事件,用来调用cleanup方法进行清理工作。然后调用splitLocalRemoteBlocks()方法,将远程Block和本地Block进行拆分,并将拆分后的远程Block打散放入fetchRequest队列中(供fetchUpToMaxBytes()使用)。接着分别调用fetchUpToMaxBytes()和fetchLocalBlocks()方法。

splitLocalRemoteBlocks

方法的逻辑也相对简单,遍历blocksByAddress(类型为Seq[(BlockManagerId, Seq[(BlockId, Long)]],BlockManagerId代表了一个远程地址,这个地址上存折多个Block,这里以(BlockId,Block的size)来表示)(这里的blocksByAddress是通过mapOutputTracker(通过SparkEnv.get.mapOutputTracker得到)的getMapSizesByExecutorId方法得到的)中的数据。
遍历数据时,会拿到BlockManagerId和这个BlockManager上的Block列表。通过BlockManagerId,能够得到executorId,对这个executorId与本地SparkEnv中的BlockManager中的executor进行比较,就可以判断这个BlockManagerId是否是本地的了(BlockManager中的executorId就相当于BlockManager的唯一ID了)。如果是本地的,将Block列表加入到localBlocks集合中(对于size为0的Block直接丢掉,无需拉取),后面fetchLocalBlocks方法会对这个集合进行处理。对于远程,需要对远程BlockManager中的每个Block进行遍历,这样做的目的是为了拆分FetchRequest(可能会将一个BlockManager中的多个Block分不同的请求来拉取)。

image.png

从代码中对于FetchRequest的拆分,我们可以了解到在配置spark.reducer.maxSizeInFlight参数的时候,这个参数的最小值应该为一个Block的大小,否则后面没法发送请求,因为任何一个数据的大小都会超过限制,而FetchRequest的拆分是以Block为单位的,一个FetchRequest最少含有一个Block。

fetchLocalBlocks

fetchLocalBlocks的逻辑很简单,基本流程如下:
image.png
此方法就是遍历localBlocks中的数据,并调用本地的BlockManger获取数据,然后将包装为一个FetchResult放到results中,供next方法使用。
如果在获取本地Block时发生异常,则推送results时推送的是FailureFetchResult,否则推送的是SuccessFetchResult。

至此,ShuffleBlockFetcherIterator的处理逻辑就介绍完了。

shuffleWriter到shuffleReader的调用链

我们已经知道了BlockStoreShuffleReader作为ShuffleReader进行数据读取,read方法返回Iterator对象。那么调用这个read方法的流程是什么样的呢?简单的流程图如下:
image.png
基本流程就是在执行task的时候,会使用ShuffleWriter来写数据,写数据的时候就会调用RDD的iterator来读取数据。在使用iterator读取数据的时候会根据存储级别来确定调用getOrCompute()方法还是computeOrReadCheckpoint()方法,当RDD的存储级别不为NONE的时候就会调用getOrCompute()方法。但是这两个方法都会调用到computeOrReadCheckpoint()方法,然后调用ShuffleRDD的compute()方法,最终调用到了ShuffleReader的read()方法。所以接下来我们要看看ShuffleWriter的流程。

ShuffleWriter

ShuffleWriter的获取,ShuffleWriter是调用ShuffleManager(SparkEnv.get.shuffleManager)的getWriter方法得到的。调用getWriter方法的时候会传递dep.shuffleHandle作为参数,因为方法中会根据shuffleHandle的类型,生成不同类型的ShuffleWriter。
image.png
接下来,我们具体分析一下ShuffleWriter的write方法(以简单的SortShuffleWriter为例)。

SortShuffleWriter.write

此方法的逻辑也比较简单。首先生成ExternalSorter,根据dependency.mapSideCombine来确定生成的ExternalSorter是否含有聚合器(如果为true则含有)。
然后调用sorter.insertAll将read返回的Iterator插入到上面生成的ExternalSorter中。
接着,使用IndexShuffleBlockResolver(其中包含BlockManager,因此可以管理Block),根据shuffleId和partitionId创建shuffle的数据文件,同时创建这个数据文件的临时文件(在数据文件后面加一个随机的后缀)。
然后调用sorter.writePartitionedFile方法,将sorter中的数据写到上面的临时文件中。接着根据临时文件写索引文件,并将临时文件调整为正式文件,通过调用shuffleBlockResolver.writeIndexFileAndCommit方法来实现(此方法中会对数据文件和索引文件进行验证)。
write方法比较简单,对于分类器ExternalSorter,我们在ShuffleReader和ShuffleWriter中都看到了,shuffle过程数据的混洗也是这样完成的吧。
write方法中使用shuffleBlockResolver做了两件事:获取数据文件(getDataFile)和写索引文件(writeIndexFileAndCommit)。接下来我们对这两个方法也一并分析一下。

IndexShuffleBlockResolver.getDataFile

image.png
方法的实现很简单吧。就是调用BlockManager中的DiskBlockManager去获取文件,如果没有,则生成一个文件并返回(关于DiskBlockManager操作文件的逻辑可以参考内存分析章节)。

shuffleBlockResolver.writeIndexFileAndCommit

这个方法逻辑稍微复杂一些,先是根据shuffleId和partitionId创建索引文件和索引临时文件。然后根据sorter.insertAll方法返回的文件每个分区的长度,写到索引临时文件中。索引文件指定了数据文件中每个partition的起止offset(sorter.insertAll方法,我们有机会在深入分析)。到这里需要注意,刚刚写的数据文件和索引文件,都是写入到临时文件中的,因为数据文件和索引文件可能已经存在了(因为一个task可能有多个尝试在同时执行),所以接下来就会对索引文件和数据文件(注意这里不是临时文件)进行验证(调用checkIndexAndDataFile方法)。验证成功,则表示有task的其他尝试已经完成了文件的写入,直接将索引临时文件和数据临时文件删除即可;如果验证失败,则将已有的数据文件和索引文件删除,将我们上面生成的索引临时文件和数据临时文件重命名为正式的索引文件和数据文件。
缺少一个流程图。。。。。。TODO

至此,数据的写入逻辑也就介绍完了,另外两种Writer的write实现,有机会再补充。

ShuffleClient

在sendRequest方法中,我们调用了ShuffleClient对象来拉取Blocks。有个细节肯定没有忘记,就是会根据此次请求要拉取数据的大小来决定是否会传递TempShuffleFileManager,这个TempShuffleFileManager有什么用,我们会在分析的时候看到它的作用。另外,还有一个问题:ShuffleClient是如何得到的,具体的实现类是哪个?
从生成ShuffleBlockFetcherIterator是,我们知道ShuffleClient是调用BlockManager的shuffleClient方法得到的。
image.png
代码逻辑很简单,根据配置“spark.shuffle.service.enabled”来确定使用ExternalShuffleClient作为ShuffleClient,还是使用BlockTransferService。因为我们开启了此参数,所以我们会针对ExternalShuffleClient进行分析。生成ExternalShuffleClient的时候会需要一个SparkTransportConf

整个过程中所使用的配置

配置参数 默认值 参数作用 使用位置
spark.reducer.maxSizeInFlight 48M 控制该节点可以同时拉取多大量的数据(所有请求同时拉取数据的字节数) 1、isRemoteBlockFetchable方法中验证正在拉取的数据的数据量。2、在splitLocalRemoteBlocks方法中,用于拆分FetchRequest(当一个地址上的多个Block的总大小超过该值的五分之一的时候,就把这些Block拆到多个FetchRequest中)。
spark.reducer.maxReqsInFlight Int.MaxValue 控制节点同时可以发送的请求数,超过这个数的请求,将被放到deferredFetchRequests中 isRemoteBlockFetchable方法中验证当前正在拉取数据的请求数
spark.reducer.maxBlocksInFlightPerAddress Int.MaxValue 一个远程节点,可以同时拉取Block的个数 1、isRemoteAddressMaxedOut方法中检查某个远程地址上正在拉取的block的数量是否超出最大值。2、在splitLocalRemoteBlocks方法中,当一个地址(BlockManagerId)上拥有的Block个数超过该值时,为了可以提交send方法,需要将这些Block拆分到多个FetchRequest中
spark.shuffle.detectCorrupt true 如果此参数为true,会将ManagerBuffer得到的输入流转换为ChunkedByteBufferInputStream类型 在shuffleBlockFetcherIterator的next方法中
spark.shuffle.compress true shuffle数据是否进行压缩 在next方法调用serializerManager.wrapStream方法时会验证
spark.shuffle.service.enabled false 是否启用shuffleService 如果启用了,则生成ExternalShuffleClient对象作为ShuffleClient,否则使用BlockTransferService.我们开启了此参数

一些有趣的东西

去哪里可以得到shuffle数据的分布?

答案是MapOutputTracker,通过SparkEnv.get.mapOutputTracker就可以得到MapOutputTracker对象。
比如:mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition) 就可以得到shuffleId阶段,根据ExecutorId聚合后的Block信息。

读写流程的整合

image.png

spark-2.11-storage

发表于 2018-12-07   |   分类于 spark 2.11

Spark存储管理在Spark执行application时担负这数据的传递、存储的重要职责。因此了解Spark的存储机制对于理解Spark的数据操作和性能的优化有着很重要的作用,本文以粗粒度方式来探究一下Spark的存储(最主要是内存)。由于在Spark中任务的执行、数据的传输均是发生在Executor端,因此本文只关注Executor端的存储操作。

1、内存模式 On-heap与Off-heap

因为Executor是运行在JVM上的,所以Executor最直接的就是操作On-heap内存,但是除此之外,Spark还引入了Off-heap内存的使用,使Spark可以直接操作JVM之外的系统内存,并对此操作进行了优化。

该图中关于task共享Executor内存的较少我们将在后续介绍,这里只需要关注On-heap和Off-heap的区别即可。

1.1、堆内内存 On-heap

堆内内存的大小通过参数 –executor-memory 或 配置spark.executor.memory来控制。Spark对堆内内存的管理是一种计数上的管理,因为对对象的创建和销毁实际是由JVM来具体操作的,Spark无法准确的控制这些,因此它只是从计数(空间使用量)的角度来管理堆内内存。

1.2、堆外内存 Off-heap

为了进一步优化内存的使用进而提高Shuffle的执行效率,Spark引入了堆外内存(Off-heap),使得Spark可以使用Executor上的系统内存。默认堆外内存是不可用的,可以通过 spark.memory.offHeap.enabled来进行开启。通过JDK Unsafe API,Spark能够直接操作系统内存,因此可以精确的控制堆外内存的申请和销毁。

1.3、内存的统一管理

Spark对于内存的管理使用统一的抽象接口MemoryManager。它负责Spark的内存申请、释放以及不同用途内存之间的转换。对于内存管理的实现,Spark主要基于两种内存管理:静态内存管理模式(StaticMemoryManager)和统一内存管理模式(unifledMemoryManager)。从Spark 2.0开始,默认使用的内存管理模式为 统一内存管理模式(unifledMemoryManager)。如果想要使用静态内存管理模式,可以将 spark.memory.useLegacyMode 配置设置为true。

2、内存的用途分类

从内存的使用来看,Spark对内存的使用主要在两个方面:数据执行和数据存储,所以我们堆内存的划分也就分为执行内存和存储内存。执行内存主要用于shuffle、join、sorts和aggregation的消耗,而存储内存主要用于数据的缓存和集群内数据的传输。在Spark内存管理中,执行内存和存储内存的大小是通过参数 spark.memory.storageFraction 来指定的,执行内存和存储内存之和就是Executor的可用内存(对于On-heap内存,含有other部分)。

2.1、堆内内存中执行内存与存储内存的计算

对于堆内内存,我们首先要介绍几个概念:

概念 解释/取值
系统内存 可以理解为JVM的内存,如果设置了spark.testing.memory,则使用,否则Runtime.getRuntime.maxMemory
系统预留内存 系统预留的内存,依次spark.testing.reservedMemory > spark.testing则为0 取值,默认为300(300 1024 1024)
最小系统内存 系统预留内存的1.5左右
可用内存 系统内存 - 预留系统内存
最大内存 可以被Spark使用的内存(执行内存和存储内存之和)。可用内存 * spark.memory.fraction(默认为0.6,我们集群的配置为0.7)

最大内存又分成了存储内存和执行内存。存储内存在最大内存中的占比通过spark.memory.storageFraction(默认为0.5)配置来指定,其余的为执行内存。这里额外介绍一个细节,如果 spark.executor.memory配置的值小于 最小系统内存,executor是无法启动的。

2.2、堆外内存中执行内存和存储内存的计算

堆外内存的大小是通过spark.memory.offHeap.size配置指定的。堆外内存比较简单,它不存在其他部分的内存分配,内部直接分为存储内存和执行内存。堆外存储内存所占比例同样通过配置spark.memory.storageFraction(默认0.5)来指定。

对于堆内存储内存和堆外存储内存,彼此之间是相互独立的,执行内存也是如此。

2.3、存储级别

我们已经知道了内存模式分为Off-heap和On-heap,而根据内存的用途有分为存储内存和执行内存。因此对于数据的存储,Spark从一下几个因素定义了存储级别(StorageLevel):

  1. 磁盘/内存
  2. 堆内/堆外
  3. 序列化/不序列化
  4. 有副本/没有副本

有了存储级别,就能够明确的说明数据存储的位置、数据存储的方式以及数据存储的个数。

3、静态内存管理与统一内存管理

最初Spark的采用的是静态内存管理,在2.0的版本中,Spark开始默认使用统一内存管理来进行内存管理。静态内存管理和统一内存管理的区别,可以简单的从执行内存和存储内存能否相互借用来区别。因为我们系统也是使用默认的统一内存管理来对内存进行管理,顾暂时不对静态内存管理进行理解。

3.1、统一内存管理

相对于静态内存,统一内存增加了动态占用机制的优化,其规则如下:

1、通过配置项spark.memory.storageFraction,对存储内存和执行内存进行基本值的划分。
2、当存储内存不够、执行内存充足时,可以增加存储内存的容量,减少执行内存的容量。反之亦然。
3、当存储不够且执行也不充足时,存储数据落盘。执行不够且存储也不够时,执行阻塞或失败。
4、当存储占用执行时,执行可要求存储归还,存储不够的可以落盘;当执行占用存储时,存储无法要求执行归还,只能删除数据或落盘。

3.2 动态占用机制的实现

上面我们提到了统一内存管理的动态占用机制,它可以更加充分的使用内存,那么这种机制是如何实现的呢?上面我们也说过,Spark其实是无法精确操作内存的,而是使用了类似计数管理的方式来实现的。
因此,在Spark的底层实现中,它为每种内存都创建了与之对应的内存池(执行内存池和存储内存池,但是存储模式又分为堆内和堆外,所以共有四种内存池),内存池记录了对应内存的使用量和容量。

3.2.1 MemoryManager

对于内存池的封装,是由 MemoryManager来实现,在其内部维持着四种内存池的引用。

其中只有相同内存模式的不同内存之间可以动态占用,如:OnHeapStorageMemoryPool只可以和 OnHeapExecutionMemoryPool 相互占用。另外需要注意的是,内存的总大小(执行内存和存储内存之和)一旦确定是无法修改的,虽然可以调整某个内存的大小,但是总的大小是不变的。
MemoryManager(UnifiedMemoryManager)主要的职责就是根据需要调整各自内存池的容量、计算各自内存池的当前使用量以及分配使用量。

4、存储内存的管理

存储内存最主要的使用就是数据缓存(RDD进行持久化保存)和集群内的数据传输(数据的广播)。而且我们前面也介绍了存储级别,还需要介绍一个其他的概念:Block。对于Block的理解,可以先简单的将数据的parition理解为一个Block,但是在存储过程中Block是由类型的(通过BlockId进行验证):
pimg_5c07c4739522a.png
从上图可以看出,BlockId由众多的子类,而属于哪种类别的BlockId,就是通过字符串模式匹配来决定的。
这里我们为什么要介绍Block呢?因为数据缓存就是以Block方式存储的。
在Spark中Storage模块负责Spark在计算过程中产生的数据,对数据的读写进行了统一的封装(包括从内存、磁盘、本地、远程)。在代码架构上,BlockManager分为Master和Salve。Dirver上运行的是Master,Executor上运行的是Slave,两者之间相同通信对数据块(Block)进行管理。

4.1、 具体的实现

在MemoryStore中,保持一个entries对象,它是一个LinkedHashMap[BlockId, MemoryEntry[_]]对象。MemoryEntry是一个接口,它有两个实现:DeserializedMemoryEntry 和 SerializedMemoryEntry,分别处理非序列化数据和序列化数据的保存。当由此,也就明白了存储级别(StorageLevel)中序列化和非序列化的意义了。当数据向内存中缓存数据时,其实就是将数据保存到enties中,但是与普通生成兑现不太一样,他会以连续的内存来保存,也就是说一个Block内的数据,从内存上来看是连续存储的(序列化的数据很好理解,序列化之后,对象就是一串字节数,但是对于非序列化的对象,其内部会有一个转换操作)。

5、执行内存的管理

执行内存最主要的使用就是shuffle、sorts、aggregate等操作的时候被使用。而排序和聚合其实都是以shuffle的结果来进行操作然后写出数据,所以我们先从Shuffle的存储进行分析。

5.1、 Shuffle执行内存的使用

shuffle操作是RDD之间的一种数据转换,从上一个RDD中读取,写入到下一个RDD中,因此我们将从读写两个方面来分析一下:

5.1.1、 shufflerReader

Spark的shuffle操作是由ShuffleManager(由子类SortShuffleManager进行实现)进行操作的。ShuffleManager要读取数据就需要获取Reader,从而得到BlockStoreShuffleReader,BlockStoreShuffleReader调用read()方法进行数据读取。ShuffleManager可以通过配置项spark.shuffle.manager进行设置(默认为sort,可选的值有sort和tungsten-sort):

spark.shuffle.manager的取值 所代表的类
sort org.apache.spark.shuffle.sort.SortShuffleManager
tungsten-sort org.apache.spark.shuffle.sort.SortShuffleManager

这里需要引入以概念:ShuffleClient,它是实际拉取数据的客户端。在Spark内部存在两种ShuffleClient:BlockTransferService和ExternalShuffleClient。如果配置项 spark.shuffle.service.enabled 为true(默认为false),则启用ExternalShuffleClient(比如我们的集群,就启用了这个配置)。
在生成ExternalShuffleClient的需要SparkTransportConf,该配置有两个比较重要的配置:

配置项 意义 取值
spark.shuffle.io.serverThreads stage之间TransServer的线程数 用户设定,默认与可用的core的数量相同
spark.shuffle.io.clientThreads 用户设定,默认与可用的core的数量相同

可用core的数量为:用户指定core数、运行时可用core数 以及 数字8 中最小的那个值(如果用户指定的数不是0,则使用用户指定的数和8中最小的值,否则就是可用core数和8中最小的那个)。我们集群没有对此进行配置,因此会使用JVM可用的core数进行设置,但是不会超过8个。
ExternalShuffleClient中重要的方法就是fetchBlock方法。在fetchBlock方法,会创建连接到目标host和port的TransportClient,然后利用这个client生成OneForOneBlockFetcher来拉取指定executor上(通过参数execId)指定的block(通过blockIds指定)。>_< 到这里都没有看到内存的使用。。。醉了
突然一个不小心,原来OneForOneBlockFetcher中使用了一个参数 TempShuffleFileManager,它是一个接口,实现类为 ShuffleBlockFetcherIterator。这个类中有一个方法 createTempShuffleFile()。那么我们就看看,OneForOneBlockFetcher 是否将数据写到了临时文件吧(山路十八弯呀)。通过跟踪代码,果然是将远程的数据写入到个临时文件中。但是当数据写完之后,这个文件会被用来生成一个ManagedBuffer(具体类为FileSegmentManagerBuffer),对于这个ManagedBuffer的操作会交给listener进行处理,这个linstener就又指向了ShuffleBlockFetcherIterator中的 BlockFetchingListener,调用它的onBlockFetchSuccess方法。在新的方法中,ManagerBuffer作为一个SuccessFetchResult对象被推送到results中(一个LinkedBlockingQueue队列)。我们已经知道这个方法是在 ShuffleBlockFetcherIteraotr中,而这个类本身就是Iterator,所以对上面的队列的读取,就发生在Iterator的next()方法中。继续回到生成ShuffleBlockFetcherIteraotr的地方BlockStoreShuffleReader.read()中。在read方法中,又继续对数据进行了处理,怎么处理的呢,当然从字节流被转换为对象(进行解序列化操作),但是read返回的依旧是一个迭代器(Iterator)。因为shuffle操作肯定对需要一种聚合手段,这里采用了ExternalAppendOnlyMap进行聚合操作。如果还需要排序,则使用进一步使用ExternalSorter对象进行操作。这两个类好复杂,慢慢在看(也就是这两对象的操作会占用内存)。

5.1.2、ShuffleWriter

shuffleWriter的调用是在ShuffleMapTask的runTask中触发的(这也很好理解,只要在执行task结束的时候才需要写数据呀),而且我们也知道,对于Task只分为两种类型ShuffleMapTask和ResultTask,因为是了解shuffle部分,所以我们只关注ShuffleMapTask,至于ResultTask以后再继续。
至于获取ShuffleWriter,是根据ShuffleDependency中shuffleHandle的类型所有决定的,不同的handler会生成不同的Writer:

handler类型 与之对应的writer
unsafeShuffleHandle UnsafeShuffleWriter
bypassMergeSortHandle BypassMergeSortShuffleWriter
其他 SortShuffleWriter

我们选择一个较为简单的Writer吧,就看SortShuffleWriter。对于Writer来说,最重要的方法必然是write。于是我们就在方法中看到了获取数据文件、生成BlockId、写文件的操作。
pimg_5c09e4c7151a3.png
从目前来看,shuffle的写操作,写的是文件,而非内存,但是从文档或其他人的文章都提到有写内存的,应该是我还没有看到,会后续补充。

5.2、task执行内存的分配

Executor内部是以多线程的方式执行task,要启动一个task其实就是将TaskRunner放到Executor内部的线程池中执行。既然,task是在Executor中运行,多task在运行期间,执行内存是如何分配的呢?Spark在执行内存池中维持了一个HashMap用来记录每个task所占用的内存。每个task允许使用的内存范围为 maxPoolSize/2N ~ maxPoolSize/N(N为当前活跃的Task数, maxPoolSize是执行内存池的最大空间),注意该限制只是在申请资源的时候验证,当申请资源的时候,如果可以分配给task的内存小于最小值,则会使申请资源的操作进入等待状态,等到有其他任务释放内存的时候,会被再次唤醒。
pimg_5c08aca616f57.png

12…4
baimoon

baimoon

Baimoon's blog

66 日志
24 分类
30 标签
GitHub
Links
  • xrange
© 2016-07 - 2020 baimoon
由 Hexo 强力驱动
主题 - NexT.Muse