spark-2-11-BlockManager

在SparkEnv中创建BlockManagerMaster和BlockManagerMasterEndpoint,在生成BlockManagerMaster的时候需要BlockManagerMasterEndpoint(作用是什么??????????)
除此之外,BlockManager也会在SparkEnv中进行创建,并且创建BlockManager的时候,需要BlockManagerMaster的引用(作用是什么???????),而且还需要一个BlockTransferService(作用是什么???????)。

BlockManagerMaster在driver和executor上都有运行。

首先看一下BlockManagerMaster的实现

创建BlockManagerMaster的时使用的RpcEndpointRef,保持与driver的通信,接下来BlockManagerMaster中的方法,都会用到这个RpcEndpointRef。

BlockManagerMaster中的RpcEndpointRef用于向driver发送消息,dirver中会向其他BlockManagerMaster同步这个消息。
比如在BlockManagerMaster的stop方法

1
2
3
4
5
if (driverEndpoint != null && isDriver) {
tell(StopBlockManagerMaster)
driverEndpoint = null
logInfo("BlockManagerMaster stopped")
}

只有dirver节点的BlockMasterMaster停止时,才会向driver通知StopBlockManagerManager事件。

tell方法调用BlockManagerMaster的RpcEndpointRef的askSync方法发送一个事件,并期待得到true,否则抛出SparkException异常。

1
2
3
4
5
private def tell(message: Any) {
if (!driverEndpoint.askSync[Boolean](message)) {
throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
}
}

需要注意的是,这里调用的方法是askSync,同步请求。除此之外,RpcEndpointRef还有ask方法,是异步执行的。

上面我们看到了stop方法的实现,接下来我们顺序看一下:

removeExecutor

移除Executor,其实现是向driver发送一个RemoveExecutor对象。

1
2
3
4
def removeExecutor(execId: String) {
tell(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}

removeExecutorAsync

异步移除Executor,与removeExecutor的操作类似,只是使用的RpcEndpointRef的异步方法。

1
2
3
4
def removeExecutorAsync(execId: String) {
driverEndpoint.ask[Boolean](RemoveExecutor(execId))
logInfo("Removal of executor " + execId + " requested")
}

registerBlockManager

向dirver注册Blockmanager的id(可以这么理解吗??????)。输入的BlockManagerId对象,不包含拓扑信息。注册返回的BlockManagerId会用来更新BlockManagerMaster的数据。

1
2
3
4
5
6
7
8
9
10
11
def registerBlockManager(
blockManagerId: BlockManagerId,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
val updatedId = driverEndpoint.askSync[BlockManagerId](
RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}

其实现就是通过RpcEndpointRef向driver发送一个RegisterBlockManager事件,事件中包含BlockManagerId、最大堆内内存和slaveEndpoint(RpcEndpointRef)。

updateBlockInfo

updateBlockInfo方法用于向driver发送某个Block的最新信息,信息包括存储级别、内存大小和磁盘大小。

1
2
3
4
5
6
7
8
9
10
11
def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
val res = driverEndpoint.askSync[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logDebug(s"Updated info of block $blockId")
res
}

其实现是通过RpcEndpointRef向dirver发送UpdateBlockInfo事件。

getLocations

获取指定BlockId的位置。

1
2
3
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))
}

其实现是通过RpcEndpointRef向driver发送一个GetLocations事件。

getLocations

获取多个BlockId的位置

1
2
3
4
def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
GetLocationsMultipleBlockIds(blockIds))
}

其实现是通过RpcEndpointRef向driver发送一个GetLocationsMultipleBlockIds事件。

contains

判断当前BlockManagerMaster中是否包含指定的Block

1
2
3
def contains(blockId: BlockId): Boolean = {
!getLocations(blockId).isEmpty
}

其实现为,通过RpcEndpointRef向driver发送一个获取BlockId位置的请求,如果可以获得表示Block存在。

getPeers

从driver那里获取集群中其他BlockManagerId

1
2
3
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

通过RpcEndpoint向driver发送一个GetPeers事件

getExecutorEndpointRef

获取指定executor的RpcEndpointRef信息。

1
2
3
def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
}

其实现是,通过RpcEndpointRef向driver发送一个GetExecutorEndpointRef事件。

removeBlock

移除指定的Block,只有driver知道的Block才能够被移除。

1
2
3
def removeBlock(blockId: BlockId) {
driverEndpoint.askSync[Boolean](RemoveBlock(blockId))
}

通过RpcEndpointRef向driver发送RemoveBlock事件来实现。

removeRdd

移除所有归属于指定RDD的Block

1
2
3
4
5
6
7
8
9
10
def removeRdd(rddId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}

通过RpcEndpointRef向driver发送RemoveRdd来实现,第二个参数决定是否要等到结果返回。

removeShuffle

移除所有属于指定Shuffle的Block。

1
2
3
4
5
6
7
8
9
10
def removeShuffle(shuffleId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}

通过RpcEndpointRef向driver发送RemoveShuffle事件来实现,第二个参数决定是否要等到结果返回。

removeBroadcast

移除所有归属于指定Broadcast的Block。

1
2
3
4
5
6
7
8
9
10
11
12
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}

通过RpcEndpointRef向driver发送RemoveBroadcast事件来实现。第三个参数blocking决定是否要等到结果返回。

getMemoryStatus

获取每个BlockManager的内存状态,返回每个BlockManager所分配的最大内存以及内存的剩余。

1
2
3
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
driverEndpoint.askSync[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}

通过RpcEndpointRef向driver发送GetMemoryStatus对象来实现。

getStorageStatus

获取存储存储状态

1
2
3
def getStorageStatus: Array[StorageStatus] = {
driverEndpoint.askSync[Array[StorageStatus]](GetStorageStatus)
}

通过RpcEndpointRef向driver发送GetStorageStatus对象来实现。

getBlockStatus

获取所有Block Manager上的block的状态。该操作开销昂贵,仅用于测试。
通过RpcEndpointRef向driver发送GetBlockStatus事件来实现。

getMatchingBlockIds

返回符合过滤器的BlockId

1
2
3
4
5
6
7
def getMatchingBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askSlaves)
val future = driverEndpoint.askSync[Future[Seq[BlockId]]](msg)
timeout.awaitResult(future)
}

通过RpcEndpointRef向driver发送GetMatchingBlockIds事件来实现。

hasCachedBlock

判断指定Executor是否缓存了Block,不包括broadcast block。

1
2
3
def hasCachedBlocks(executorId: String): Boolean = {
driverEndpoint.askSync[Boolean](HasCachedBlocks(executorId))
}

通过RpcEndpointRef向driver发送HasCachedBlocks事件来实现。

通过对上面方法的了解我们知道,BlockManagerMaster不一定是运行在driver上的,也会运行在Executor上。它内部持有与driver进行沟通的RpcEndpointRef。在SparkEnv中,生成BlockManager的时候,会将BlockManagerMaster传递给BlockManager。

接下来看一下BlockManagerMasterEndpoint

上面我们已经知道了BlockManagerMaster会通过BlockManagerMasterEndpoint来请求相关的操作。BlockManagerMasterEndpoint就是用来对BlockManagerMaster的请求进行响应的。

首先看一下BlockManagerMasterEndpoint的成员

blockManagerInfo 存储了BlockManagerId到BlockManagerInfo的映射关系。
blockManagerIdByExecutor 存储了executor id 到 BlockManagerId的映射关系。
blockLocations 存储了BlockId到BlockManagerId的映射关系。
topologyMapper 拓扑映射的实现类。

上面topologyMapper的定义,首先读取配置项“spark.storage.replication.topologyMapper”的值,如果没有配置则使用默认的拓扑管理DefaultTopologyMapper。然后使用SparkConf进行实例化。

接下来针对BlockManagerMasterEndpoint对BlockManagerMaster的应答进行解析

RegisterBlockManager的应答register

register方法是对RegisterBlockManager事件的应答,它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private def register(
idWithoutTopologyInfo: BlockManagerId,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
val id = BlockManagerId(
idWithoutTopologyInfo.executorId,
idWithoutTopologyInfo.host,
idWithoutTopologyInfo.port,
topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))
val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(oldId) =>
// A block manager of the same executor already exists, so remove it (assumed dead)
logError("Got two different block manager registrations on same executor - "
+ s" will replace old one $oldId with new one $id")
removeExecutor(id.executorId)
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))
blockManagerIdByExecutor(id.executorId) = id
blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id
}

该方法返回的是一个BlockManagerId,与输入参数不同,返回的对象中包含了拓扑信息。而这个拓扑信息就是从topologyManager中得到的。接下来,判断BlockManagerId到BlockManagerInfo的信息中是否含有当前BlockManagerId的信息,如果含有,则跳过执行,只有在不含有的时候,才会继续处理。
接着判断是否在blockManagerIdByExecutor中,如果有,则说明相同executor的BlockManager已经存在了,需要将这个executor的老的BlockManager移除掉,然后将新的BlockManager添加到blockManagerIdByExecutor中。并且根据BlockManagerId.id、最大堆内内存、最大堆外内存、以及要注册的BlockManagerId的RpcEndpointRef生成BlockManagerInfo对象,添加到BlockManagerId到BlockManagerInfo的映射关系中。

UpdateBlockInfo的应答updateBlockInfo

updateBlockInf是对UpdateBlockInfo事件的应答,它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
return true
} else {
return false
}
}
// 更新BlockManagerInfo最后操作时间
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
return true
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
locations = blockLocations.get(blockId)
} else {
locations = new mutable.HashSet[BlockManagerId]
blockLocations.put(blockId, locations)
}
if (storageLevel.isValid) {
locations.add(blockManagerId)
} else {
locations.remove(blockManagerId)
}
// Remove the block from master tracking if it has been removed on all slaves.
if (locations.size == 0) {
blockLocations.remove(blockId)
}
true
}

因为该方法的功能为更新,所以如果参数给定的BlockManagerId不存在会返回false(driver除外)。然后调用BlockManagerInfo中的updateBlockInfo方法来更新BlockManagerInfo,这个方法稍后介绍BlockManagerInfo的时候再看。接着,记录BlockId到BlockManagerId的映射关系,因为一个BlockId的数据可能存在多个BlockManager中(或者分散存储,或者有多个副本),因此BlockId对应的是一个HashSet,HashSet中存放的是BlockManagerId,但是需要注意的是,BlockManagerId是否可以加到上面的HashSet中,取决于参数的storageLevel,只有storageLevel有效时(存储级别为内存或磁盘,且副本个数大于0)才会存储。最后对blockLocations进行清理,对于没有存储位置的BlockId,从blockLocations中删除。

GetLocations的应答getLocations

getLocations是对GetLocations事件的应答,它的定义如下:

1
2
3
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}

在updateBlockInfo方法中我们已经知道blockLocations存储的是BlockId到BlockManagerId的映射。因此,只要有BlockId,就可以找到BlockManagerId的列表。

GetLocationsMultipleBlockIds的应答getLocationsMultipleBlockIds

getLocationsMultipleBlockIds是对GetLocationsMultipleBlockIds事件的应答,它的定义如下:

1
2
3
4
private def getLocationsMultipleBlockIds(
blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
blockIds.map(blockId => getLocations(blockId))
}

相对于getLocations,这个方法传输的参数是BlockId列表,因此只需要迭代获取每个BlockId的位置,返回一个映射关系即可。

GetPeers的应答getPeers

getPeers是对GetPeers事件的应答,它的定义如下:

1
2
3
4
5
6
7
8
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
} else {
Seq.empty
}
}

该方法就是获取BlockManagerId节点上其他非driver的BlockManagerId的列表。从BlockManagerId到BlockManagerInfo的映射关系中得到KeySet,就是当前节点上所有的BlockManagerId,只要过滤到driver以及与当前BlockManagerId相同id的BlockManagerId即可。

GetExecutorEndpointRef的应答getExecutorEndpointRef

getExecutorEndpointRef是GetExecutorEndpointRef事件的应答,它的定义如下:

1
2
3
4
5
6
7
8
private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
for (
blockManagerId <- blockManagerIdByExecutor.get(executorId);
info <- blockManagerInfo.get(blockManagerId)
) yield {
info.slaveEndpoint
}
}

我们在register方法中生成BlockManagerInfo时,BlockManagerInfo的最后一个参数就是slaveEndpoint(一个RpcEndpointRef对象)。所以调用getExecutorEndpointRef方法,只要根据executorId得到BlockManagerId,然后根据BlockManagerId得到BlockManagerInfo,就可以得到这个slaveEndpoint了。

GetMemoryStatus的应答memoryStatus

memoryStatus是GetMemoryStatus事件的应答,其定义如下:

1
2
3
4
5
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
blockManagerInfo.map { case(blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
}

方法逻辑也很简单,BlockManagerInfo中已经存着最大内存了(最大堆内内存和最大堆外内存),并且在每次执行updateBlockInfo方法时会对剩余内存进行操作,只要拿到BlockManagerInfo,就拿到了内存状态,但是缺点是无法知道具体的堆内内存和堆外内存的状态。

GetStorageStatus的应答storageStatus

storageStatus方法是对GetStorageStatus事件的应答,其定义如下:

1
2
3
4
5
6
private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
Some(info.maxOffHeapMem), info.blocks.asScala)
}.toArray
}

StorageStatus,我们理解为存储状态,也就是每个BlockManager(用BlockManagerId表示)的存储状态,包括最大内存、最大堆内内存、最大堆外内存和Block列表(BlockId->BlockStatus的对应关系集合)。这些信息都存储在BlockManagerInfo中,只要拿到BlockManagerInfo就OK了。

GetBlockStatus的应答blockStatus

blockStatus方法是对GetBlockStatus事件的应答,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def blockStatus(
blockId: BlockId,
askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {
val getBlockStatus = GetBlockStatus(blockId)
/*
* Rather than blocking on the block status query, master endpoint should simply return
* Futures to avoid potential deadlocks. This can arise if there exists a block manager
* that is also waiting for this master endpoint's response to a previous message.
*/
blockManagerInfo.values.map { info =>
val blockStatusFuture =
if (askSlaves) {
info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus)
} else {
Future { info.getStatus(blockId) }
}
(info.blockManagerId, blockStatusFuture)
}.toMap
}

获取BlockId的状态,如果askSlaves,则会调用info中的RpcEndpointRef取获取最新的BlockSatus,否则就使用BlockManagerInfo中当前的。一个BlockId可能会对应多个BlockManagerId。

GetMatchingBlockIds的应答getMatchingBlockIds

getMatchingBlockIds方法是对GetMatchingBlockIds事件的应答,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private def getMatchingBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Future[Seq[BlockId]] = {
val getMatchingBlockIds = GetMatchingBlockIds(filter)
Future.sequence(
blockManagerInfo.values.map { info =>
val future =
if (askSlaves) {
info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
} else {
Future { info.blocks.asScala.keys.filter(filter).toSeq }
}
future
}
).map(_.flatten.toSeq)
}

我们已经知道BlockManager的具体信息是使用BlockManagerInfo对象来表示的,在BlockManagerInfo中的blocks就存储了BlockId到BlockStatus的映射关系,filter就是对BlockId对象进行过滤,并得到符合条件的BlockId。

RemoveRdd的应答removeRdd

removeRdd方法是对RemoveRdd事件的应答,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocks.foreach { blockId =>
val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
blockLocations.remove(blockId)
}
val removeMsg = RemoveRdd(rddId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
}

我们已经blockLocatins中存储的是BlockId到Set[BlockManagerId]的映射关系。一个BlockId是否是RDD,可以通过asRDDId得到,这样就可以得到符合条件的RDD的BlockId。通过BlockId能够得到BlockManagerId的列表,通过blockManagerInfo对象,我们可以得到BlockManagerId所对应的BlockManagerInfo。调用BlockManagerInfo中的removeBlock方法移除对RDD所对应的block的操作(空间使用的记录的释放)。然后告诉所有的BlockManager,要删除RDD(通过BlockManagerInfo中的slaveEndpointRef)。

RemoveShuffle的应答removeShuffle

removeShuffle方法是对RemoveShuffle事件的应答,其定义如下:
·

1
2
3
4
5
6
7
8
9
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
// Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Boolean](removeMsg)
}.toSeq
)
}

方法的实现和RemoveRdd方法类似,直接对blockManagerInfo中的values(BlockManagerInfo集合)扫描,调用info的rpcEndpointRef,请求移除Shuffle(发送RemoveShuffle事件)。

RemoveBroadcast的响应removeBroadcast

removeBroadcast方法是对RemoveBroadcast事件的响应,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
}

方法的实现与removeShuffle方法类似,直接对BlockManagerInfo中的values(BlockManagerInfo集合)扫描,然后调用info的repEndpointRef,请求移除广播变量。

RemoveBlock的应答removeBlockFromWorkers

removeBlockFromWorkers方法是对RemoveBlock事件的应答,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def removeBlockFromWorkers(blockId: BlockId) {
val locations = blockLocations.get(blockId)
if (locations != null) {
locations.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
if (blockManager.isDefined) {
// Remove the block from the slave's BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
}
}
}
}

方法的实现逻辑与removeShuffle方法类似,blockLocations中保存了BlockId到Set[BlockManagerId]的映射关系,能够得到BlockId存放在哪些BlockManager中。blockManagerInfo中保存着BlockManagerId到BlockManagerInfo的映射关系,从BlockManagerInfo中就可以得到slave的RpcEndpointRef,从而用来对slave发送Block移除请求。

RemoveExecutor的应答removeExecutor

removeExecutor方法是对RemoveExecutor事件的应答,其定义如下:

1
2
3
4
private def removeExecutor(execId: String) {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}

移除Executor所有的Block,blockManagerIdByExecutor中记录executor上的BlockManager,然后调用removeBlockManager方法来移除BlockManager。

removeBlockManager方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
blockManagerIdByExecutor -= blockManagerId.executorId
blockManagerInfo.remove(blockManagerId)
val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
locations -= blockManagerId
if (locations.size == 0) {
blockLocations.remove(blockId)
logWarning(s"No more replicas available for $blockId !")
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
val maxReplicas = locations.size + 1
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
val blockLocations = locations.toSeq
val candidateBMId = blockLocations(i)
blockManagerInfo.get(candidateBMId).foreach { bm =>
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
bm.slaveEndpoint.ask[Boolean](replicateMsg)
}
}
}
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
logInfo(s"Removing block manager $blockManagerId")
}

方法的逻辑很简单,根据BlockManagerId可以找到对应的BlockManagerInfo,BlockManagerInfo中保存着它所管理的Block列表,在blockLocations中,可以根据BlockId找到Block所分布的BlockManagerId(比如副本的分布)。然后调用分布的BlockManager的RpcEndpointRef,来发送ReplicateBlock事件,确保Block数据的副本数。最后利用listenerBus,广播对应的BlockManagerId已经被移除。

StopBlockManagerMaster的应答

StopBlockManagerMaster的应答很多简单,返回true,然后直接调用stop方法。

BlockManagerHeartbeat的应答heartbeatReceived

heartbeatReceived方法用来对BlockManagerHeartbeat事件进行响应,其实现如下:

1
2
3
4
5
6
7
8
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
blockManagerId.isDriver && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
true
}
}

在每个BlockManagerInfo中有一个变量_lastSeenMs,用来记录BlockManagerInfo的最后的心跳。通过updateLastSeenMs方法更新这个心跳。

通过上面,我就把BlockManagerMasterEndpoint都梳理了一遍,因此我们也了解到,这些操作,都是基于blockId到BlockManagerId的映射关系、BlockManagerId到BlockMangerInfo的映射关系来实现的。其中BlockManagerInfo保存了有关Block的详细信息。因此有必要对BlockManagerInfo进行一下简单的了解。

BlockManagerInfo

BlockManagerInfo存储了BlockManager的详细信息,如BlockManagerId、最大堆内内存、最大堆外内存、链接BlockManager的rpcEndpointRef、最后心跳时间、最大内存、剩余内存以及BlockManager所管理的BlockId信息。其中管理BlockId信息的集合存储的是BlockId到BlockStatus的映射。
其中最重要的方法是removeBlock和updateBlockInfo。这两个方法会影响BlockManager的内存使用和相关Block的映射关系。
接下来我们对这两个方法进行分析:

removeBlock

removeBlock的定义很简单:

1
2
3
4
5
6
7
def removeBlock(blockId: BlockId) {
if (_blocks.containsKey(blockId)) {
_remainingMem += _blocks.get(blockId).memSize
_blocks.remove(blockId)
}
_cachedBlocks -= blockId
}

_blocks是BlockId到BlockStatus的映射关系。因此在remove的实现中,就是将指定的BlockId从_blocks中移除,并将BlockId所占用的内存释放。这里没有对Block的实际移除进行操作,应该是slave节点执行删除完成后再调用,更BlockManagerMaster中的信息。

updateBlockInfo

updateBlockInfo方法用来更新BlockManagerInfo中对BlockManager的状态,也就是开始说的那些状态。_blocks是BlockId到BlockStatus的映射关系,至于BlockId与BlockManagerId的映射关系,保存在BlockManagerMasterEndpoint中的blockManagerInfo中了。
下面是updateBlockInfo的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def updateBlockInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long) {
updateLastSeenMs()
val blockExists = _blocks.containsKey(blockId)
var originalMemSize: Long = 0
var originalDiskSize: Long = 0
var originalLevel: StorageLevel = StorageLevel.NONE
if (blockExists) {
// The block exists on the slave already.
val blockStatus: BlockStatus = _blocks.get(blockId)
originalLevel = blockStatus.storageLevel
originalMemSize = blockStatus.memSize
originalDiskSize = blockStatus.diskSize
if (originalLevel.useMemory) {
_remainingMem += originalMemSize
}
}
if (storageLevel.isValid) {
/* isValid means it is either stored in-memory or on-disk.
* The memSize here indicates the data size in or dropped from memory,
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
var blockStatus: BlockStatus = null
if (storageLevel.useMemory) {
blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
if (blockExists) {
logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
s" (current size: ${Utils.bytesToString(memSize)}," +
s" original size: ${Utils.bytesToString(originalMemSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
} else {
logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
s" (size: ${Utils.bytesToString(memSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
}
}
if (storageLevel.useDisk) {
blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
_blocks.put(blockId, blockStatus)
if (blockExists) {
logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
s" (current size: ${Utils.bytesToString(diskSize)}," +
s" original size: ${Utils.bytesToString(originalDiskSize)})")
} else {
logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
s" (size: ${Utils.bytesToString(diskSize)})")
}
}
if (!blockId.isBroadcast && blockStatus.isCached) {
_cachedBlocks += blockId
}
} else if (blockExists) {
// If isValid is not true, drop the block.
_blocks.remove(blockId)
_cachedBlocks -= blockId
if (originalLevel.useMemory) {
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
s" (size: ${Utils.bytesToString(originalMemSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
}
if (originalLevel.useDisk) {
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
s" (size: ${Utils.bytesToString(originalDiskSize)})")
}
}
}

_blocks中保存了BlockId到BlockStatus的映射关系。BlockStatus中包含的信息有:存储级别、使用的内存size和使用的磁盘size。方法首先对心跳时间进行更新,接着获取判断当前BlockManagerInfo中是否已经包含了要更新的BlockId的信息,如果有,将以存在的BlockStatus从BlockManagerInfo中移除(主要是内存,从剩余内存中恢复占用的内存,因为使用更新数据重新计算剩余内存)。然后,判断存储级别是否有效(内存和磁盘级别的存储并且副本数大于0,才认为有效),如果无效,并且blockId以前存在,则说明BlockId改变了存储级别,则将BlockId从BlockManagerInfo中移除,包括_blocks和_cachedBlocks(存储了当前BlockManagerInfo所管理的所有BlockId);如果存储级别有效,则使用使用新的存储级别、内存size和磁盘size创建一个BlockStatus,然后将BlockStatus存入到_blocks中,对于非广播变量且已经使用了内存或磁盘的BlockId,添加到_cachedBlocks中。
总体来说BlockManagerInfo的updateManagerInfo方法就是用新的BlockId的信息来更新BlockManagerInfo中缓存的信息,只是这种更新不是增长式的是替换式的。

BlockManager

通过上面,我们已经了解到了BlockManagerMaster和BlockManagerMasterEndpoint之间的操作。接下来了解一下另外一部份:BlockManager。
BlockManager相较于BlockManagerMaster,它的功能就复杂多了。它会涉及SerializerManager、MemoryManager、MapOutputTracker、ShuffleManager、BlockTransferService、SecurityManager的联合操作。