在SparkEnv中创建BlockManagerMaster和BlockManagerMasterEndpoint,在生成BlockManagerMaster的时候需要BlockManagerMasterEndpoint(作用是什么??????????)
除此之外,BlockManager也会在SparkEnv中进行创建,并且创建BlockManager的时候,需要BlockManagerMaster的引用(作用是什么???????),而且还需要一个BlockTransferService(作用是什么???????)。
BlockManagerMaster在driver和executor上都有运行。
首先看一下BlockManagerMaster的实现
创建BlockManagerMaster的时使用的RpcEndpointRef,保持与driver的通信,接下来BlockManagerMaster中的方法,都会用到这个RpcEndpointRef。
BlockManagerMaster中的RpcEndpointRef用于向driver发送消息,dirver中会向其他BlockManagerMaster同步这个消息。
比如在BlockManagerMaster的stop方法
只有dirver节点的BlockMasterMaster停止时,才会向driver通知StopBlockManagerManager事件。
tell方法调用BlockManagerMaster的RpcEndpointRef的askSync方法发送一个事件,并期待得到true,否则抛出SparkException异常。
需要注意的是,这里调用的方法是askSync,同步请求。除此之外,RpcEndpointRef还有ask方法,是异步执行的。
上面我们看到了stop方法的实现,接下来我们顺序看一下:
removeExecutor
移除Executor,其实现是向driver发送一个RemoveExecutor对象。
|
|
removeExecutorAsync
异步移除Executor,与removeExecutor的操作类似,只是使用的RpcEndpointRef的异步方法。
registerBlockManager
向dirver注册Blockmanager的id(可以这么理解吗??????)。输入的BlockManagerId对象,不包含拓扑信息。注册返回的BlockManagerId会用来更新BlockManagerMaster的数据。
|
|
其实现就是通过RpcEndpointRef向driver发送一个RegisterBlockManager事件,事件中包含BlockManagerId、最大堆内内存和slaveEndpoint(RpcEndpointRef)。
updateBlockInfo
updateBlockInfo方法用于向driver发送某个Block的最新信息,信息包括存储级别、内存大小和磁盘大小。
其实现是通过RpcEndpointRef向dirver发送UpdateBlockInfo事件。
getLocations
获取指定BlockId的位置。
其实现是通过RpcEndpointRef向driver发送一个GetLocations事件。
getLocations
获取多个BlockId的位置
其实现是通过RpcEndpointRef向driver发送一个GetLocationsMultipleBlockIds事件。
contains
判断当前BlockManagerMaster中是否包含指定的Block
其实现为,通过RpcEndpointRef向driver发送一个获取BlockId位置的请求,如果可以获得表示Block存在。
getPeers
从driver那里获取集群中其他BlockManagerId
通过RpcEndpoint向driver发送一个GetPeers事件
getExecutorEndpointRef
获取指定executor的RpcEndpointRef信息。
其实现是,通过RpcEndpointRef向driver发送一个GetExecutorEndpointRef事件。
removeBlock
移除指定的Block,只有driver知道的Block才能够被移除。
通过RpcEndpointRef向driver发送RemoveBlock事件来实现。
removeRdd
移除所有归属于指定RDD的Block
通过RpcEndpointRef向driver发送RemoveRdd来实现,第二个参数决定是否要等到结果返回。
removeShuffle
移除所有属于指定Shuffle的Block。
通过RpcEndpointRef向driver发送RemoveShuffle事件来实现,第二个参数决定是否要等到结果返回。
removeBroadcast
移除所有归属于指定Broadcast的Block。
通过RpcEndpointRef向driver发送RemoveBroadcast事件来实现。第三个参数blocking决定是否要等到结果返回。
getMemoryStatus
获取每个BlockManager的内存状态,返回每个BlockManager所分配的最大内存以及内存的剩余。
通过RpcEndpointRef向driver发送GetMemoryStatus对象来实现。
getStorageStatus
获取存储存储状态
通过RpcEndpointRef向driver发送GetStorageStatus对象来实现。
getBlockStatus
获取所有Block Manager上的block的状态。该操作开销昂贵,仅用于测试。
通过RpcEndpointRef向driver发送GetBlockStatus事件来实现。
getMatchingBlockIds
返回符合过滤器的BlockId
通过RpcEndpointRef向driver发送GetMatchingBlockIds事件来实现。
hasCachedBlock
判断指定Executor是否缓存了Block,不包括broadcast block。
通过RpcEndpointRef向driver发送HasCachedBlocks事件来实现。
通过对上面方法的了解我们知道,BlockManagerMaster不一定是运行在driver上的,也会运行在Executor上。它内部持有与driver进行沟通的RpcEndpointRef。在SparkEnv中,生成BlockManager的时候,会将BlockManagerMaster传递给BlockManager。
接下来看一下BlockManagerMasterEndpoint
上面我们已经知道了BlockManagerMaster会通过BlockManagerMasterEndpoint来请求相关的操作。BlockManagerMasterEndpoint就是用来对BlockManagerMaster的请求进行响应的。
首先看一下BlockManagerMasterEndpoint的成员
blockManagerInfo 存储了BlockManagerId到BlockManagerInfo的映射关系。
blockManagerIdByExecutor 存储了executor id 到 BlockManagerId的映射关系。
blockLocations 存储了BlockId到BlockManagerId的映射关系。
topologyMapper 拓扑映射的实现类。
上面topologyMapper的定义,首先读取配置项“spark.storage.replication.topologyMapper”的值,如果没有配置则使用默认的拓扑管理DefaultTopologyMapper。然后使用SparkConf进行实例化。
接下来针对BlockManagerMasterEndpoint对BlockManagerMaster的应答进行解析
RegisterBlockManager的应答register
register方法是对RegisterBlockManager事件的应答,它的定义如下:
该方法返回的是一个BlockManagerId,与输入参数不同,返回的对象中包含了拓扑信息。而这个拓扑信息就是从topologyManager中得到的。接下来,判断BlockManagerId到BlockManagerInfo的信息中是否含有当前BlockManagerId的信息,如果含有,则跳过执行,只有在不含有的时候,才会继续处理。
接着判断是否在blockManagerIdByExecutor中,如果有,则说明相同executor的BlockManager已经存在了,需要将这个executor的老的BlockManager移除掉,然后将新的BlockManager添加到blockManagerIdByExecutor中。并且根据BlockManagerId.id、最大堆内内存、最大堆外内存、以及要注册的BlockManagerId的RpcEndpointRef生成BlockManagerInfo对象,添加到BlockManagerId到BlockManagerInfo的映射关系中。
UpdateBlockInfo的应答updateBlockInfo
updateBlockInf是对UpdateBlockInfo事件的应答,它的定义如下:
因为该方法的功能为更新,所以如果参数给定的BlockManagerId不存在会返回false(driver除外)。然后调用BlockManagerInfo中的updateBlockInfo方法来更新BlockManagerInfo,这个方法稍后介绍BlockManagerInfo的时候再看。接着,记录BlockId到BlockManagerId的映射关系,因为一个BlockId的数据可能存在多个BlockManager中(或者分散存储,或者有多个副本),因此BlockId对应的是一个HashSet,HashSet中存放的是BlockManagerId,但是需要注意的是,BlockManagerId是否可以加到上面的HashSet中,取决于参数的storageLevel,只有storageLevel有效时(存储级别为内存或磁盘,且副本个数大于0)才会存储。最后对blockLocations进行清理,对于没有存储位置的BlockId,从blockLocations中删除。
GetLocations的应答getLocations
getLocations是对GetLocations事件的应答,它的定义如下:
在updateBlockInfo方法中我们已经知道blockLocations存储的是BlockId到BlockManagerId的映射。因此,只要有BlockId,就可以找到BlockManagerId的列表。
GetLocationsMultipleBlockIds的应答getLocationsMultipleBlockIds
getLocationsMultipleBlockIds是对GetLocationsMultipleBlockIds事件的应答,它的定义如下:
相对于getLocations,这个方法传输的参数是BlockId列表,因此只需要迭代获取每个BlockId的位置,返回一个映射关系即可。
GetPeers的应答getPeers
getPeers是对GetPeers事件的应答,它的定义如下:
该方法就是获取BlockManagerId节点上其他非driver的BlockManagerId的列表。从BlockManagerId到BlockManagerInfo的映射关系中得到KeySet,就是当前节点上所有的BlockManagerId,只要过滤到driver以及与当前BlockManagerId相同id的BlockManagerId即可。
GetExecutorEndpointRef的应答getExecutorEndpointRef
getExecutorEndpointRef是GetExecutorEndpointRef事件的应答,它的定义如下:
我们在register方法中生成BlockManagerInfo时,BlockManagerInfo的最后一个参数就是slaveEndpoint(一个RpcEndpointRef对象)。所以调用getExecutorEndpointRef方法,只要根据executorId得到BlockManagerId,然后根据BlockManagerId得到BlockManagerInfo,就可以得到这个slaveEndpoint了。
GetMemoryStatus的应答memoryStatus
memoryStatus是GetMemoryStatus事件的应答,其定义如下:
方法逻辑也很简单,BlockManagerInfo中已经存着最大内存了(最大堆内内存和最大堆外内存),并且在每次执行updateBlockInfo方法时会对剩余内存进行操作,只要拿到BlockManagerInfo,就拿到了内存状态,但是缺点是无法知道具体的堆内内存和堆外内存的状态。
GetStorageStatus的应答storageStatus
storageStatus方法是对GetStorageStatus事件的应答,其定义如下:
StorageStatus,我们理解为存储状态,也就是每个BlockManager(用BlockManagerId表示)的存储状态,包括最大内存、最大堆内内存、最大堆外内存和Block列表(BlockId->BlockStatus的对应关系集合)。这些信息都存储在BlockManagerInfo中,只要拿到BlockManagerInfo就OK了。
GetBlockStatus的应答blockStatus
blockStatus方法是对GetBlockStatus事件的应答,其定义如下:
获取BlockId的状态,如果askSlaves,则会调用info中的RpcEndpointRef取获取最新的BlockSatus,否则就使用BlockManagerInfo中当前的。一个BlockId可能会对应多个BlockManagerId。
GetMatchingBlockIds的应答getMatchingBlockIds
getMatchingBlockIds方法是对GetMatchingBlockIds事件的应答,其定义如下:
我们已经知道BlockManager的具体信息是使用BlockManagerInfo对象来表示的,在BlockManagerInfo中的blocks就存储了BlockId到BlockStatus的映射关系,filter就是对BlockId对象进行过滤,并得到符合条件的BlockId。
RemoveRdd的应答removeRdd
removeRdd方法是对RemoveRdd事件的应答,其定义如下:
我们已经blockLocatins中存储的是BlockId到Set[BlockManagerId]的映射关系。一个BlockId是否是RDD,可以通过asRDDId得到,这样就可以得到符合条件的RDD的BlockId。通过BlockId能够得到BlockManagerId的列表,通过blockManagerInfo对象,我们可以得到BlockManagerId所对应的BlockManagerInfo。调用BlockManagerInfo中的removeBlock方法移除对RDD所对应的block的操作(空间使用的记录的释放)。然后告诉所有的BlockManager,要删除RDD(通过BlockManagerInfo中的slaveEndpointRef)。
RemoveShuffle的应答removeShuffle
removeShuffle方法是对RemoveShuffle事件的应答,其定义如下:
·
方法的实现和RemoveRdd方法类似,直接对blockManagerInfo中的values(BlockManagerInfo集合)扫描,调用info的rpcEndpointRef,请求移除Shuffle(发送RemoveShuffle事件)。
RemoveBroadcast的响应removeBroadcast
removeBroadcast方法是对RemoveBroadcast事件的响应,其定义如下:
方法的实现与removeShuffle方法类似,直接对BlockManagerInfo中的values(BlockManagerInfo集合)扫描,然后调用info的repEndpointRef,请求移除广播变量。
RemoveBlock的应答removeBlockFromWorkers
removeBlockFromWorkers方法是对RemoveBlock事件的应答,其定义如下:
方法的实现逻辑与removeShuffle方法类似,blockLocations中保存了BlockId到Set[BlockManagerId]的映射关系,能够得到BlockId存放在哪些BlockManager中。blockManagerInfo中保存着BlockManagerId到BlockManagerInfo的映射关系,从BlockManagerInfo中就可以得到slave的RpcEndpointRef,从而用来对slave发送Block移除请求。
RemoveExecutor的应答removeExecutor
removeExecutor方法是对RemoveExecutor事件的应答,其定义如下:
移除Executor所有的Block,blockManagerIdByExecutor中记录executor上的BlockManager,然后调用removeBlockManager方法来移除BlockManager。
removeBlockManager方法的实现
方法的逻辑很简单,根据BlockManagerId可以找到对应的BlockManagerInfo,BlockManagerInfo中保存着它所管理的Block列表,在blockLocations中,可以根据BlockId找到Block所分布的BlockManagerId(比如副本的分布)。然后调用分布的BlockManager的RpcEndpointRef,来发送ReplicateBlock事件,确保Block数据的副本数。最后利用listenerBus,广播对应的BlockManagerId已经被移除。
StopBlockManagerMaster的应答
StopBlockManagerMaster的应答很多简单,返回true,然后直接调用stop方法。
BlockManagerHeartbeat的应答heartbeatReceived
heartbeatReceived方法用来对BlockManagerHeartbeat事件进行响应,其实现如下:
在每个BlockManagerInfo中有一个变量_lastSeenMs,用来记录BlockManagerInfo的最后的心跳。通过updateLastSeenMs方法更新这个心跳。
通过上面,我就把BlockManagerMasterEndpoint都梳理了一遍,因此我们也了解到,这些操作,都是基于blockId到BlockManagerId的映射关系、BlockManagerId到BlockMangerInfo的映射关系来实现的。其中BlockManagerInfo保存了有关Block的详细信息。因此有必要对BlockManagerInfo进行一下简单的了解。
BlockManagerInfo
BlockManagerInfo存储了BlockManager的详细信息,如BlockManagerId、最大堆内内存、最大堆外内存、链接BlockManager的rpcEndpointRef、最后心跳时间、最大内存、剩余内存以及BlockManager所管理的BlockId信息。其中管理BlockId信息的集合存储的是BlockId到BlockStatus的映射。
其中最重要的方法是removeBlock和updateBlockInfo。这两个方法会影响BlockManager的内存使用和相关Block的映射关系。
接下来我们对这两个方法进行分析:
removeBlock
removeBlock的定义很简单:
_blocks是BlockId到BlockStatus的映射关系。因此在remove的实现中,就是将指定的BlockId从_blocks中移除,并将BlockId所占用的内存释放。这里没有对Block的实际移除进行操作,应该是slave节点执行删除完成后再调用,更BlockManagerMaster中的信息。
updateBlockInfo
updateBlockInfo方法用来更新BlockManagerInfo中对BlockManager的状态,也就是开始说的那些状态。_blocks是BlockId到BlockStatus的映射关系,至于BlockId与BlockManagerId的映射关系,保存在BlockManagerMasterEndpoint中的blockManagerInfo中了。
下面是updateBlockInfo的定义:
_blocks中保存了BlockId到BlockStatus的映射关系。BlockStatus中包含的信息有:存储级别、使用的内存size和使用的磁盘size。方法首先对心跳时间进行更新,接着获取判断当前BlockManagerInfo中是否已经包含了要更新的BlockId的信息,如果有,将以存在的BlockStatus从BlockManagerInfo中移除(主要是内存,从剩余内存中恢复占用的内存,因为使用更新数据重新计算剩余内存)。然后,判断存储级别是否有效(内存和磁盘级别的存储并且副本数大于0,才认为有效),如果无效,并且blockId以前存在,则说明BlockId改变了存储级别,则将BlockId从BlockManagerInfo中移除,包括_blocks和_cachedBlocks(存储了当前BlockManagerInfo所管理的所有BlockId);如果存储级别有效,则使用使用新的存储级别、内存size和磁盘size创建一个BlockStatus,然后将BlockStatus存入到_blocks中,对于非广播变量且已经使用了内存或磁盘的BlockId,添加到_cachedBlocks中。
总体来说BlockManagerInfo的updateManagerInfo方法就是用新的BlockId的信息来更新BlockManagerInfo中缓存的信息,只是这种更新不是增长式的是替换式的。
BlockManager
通过上面,我们已经了解到了BlockManagerMaster和BlockManagerMasterEndpoint之间的操作。接下来了解一下另外一部份:BlockManager。
BlockManager相较于BlockManagerMaster,它的功能就复杂多了。它会涉及SerializerManager、MemoryManager、MapOutputTracker、ShuffleManager、BlockTransferService、SecurityManager的联合操作。