本文记录对Spark Shuffle过程中对数据读写的梳理。本文将先从数据的读入作为切入点,也就是从ShuffleReader的源码作为切入,然后在逐步的展开。因此本文将从读取和写入两个角度来进行分析,最后在用流程图的方式将读取和写入整合起来。
数据读取
ShuffleReader的实现类
ShuffleRead是定义在org.apache.spark.shuffle包中的一个接口(理解为接口吧,其实是trait),它只定义了一个方法read(),并且这个方法返回一个Iterator(迭代器)。对于ShuffleReader的实现类,只有一个类 BlockStoreShuffleReader,与ShuffleReader定义在同一个包中。
ShuffleReader的read()方法的实现
read()基本的流程图参考如下如下
在read()方法中,首先生成一个ShuffleBlockFetcherIterator对象。该对象的详细介绍会在后面进行。
生成的ShuffleBlockFetcherIterator本身就是一个Iterator,但是需要注意这个Iterator中包含的数据都是进行了加密和压缩的,我们在之后分析ShuffleBlockFetcherIterator的时候会了解到。
接着,我们会得到序列化器的实例,将ShuffleBlockFectherIterator中的数据进行解析。
如果Shuffle的依赖中定义了聚合器,则需要进行一次聚合操作(将上面的迭代器中的数据再次处理):如果是map端聚合,则使用聚合器的 combineCombinersByKey来处理,否则使用combineValuesByKey来处理。
最后,创建一个ExternalSorter并将上一步的数据全部插入到这个sorter中,并使用这个ExternalSorter创建一个CompletionIterator返回。
大家在这里会看到聚合器和ExternalSorter,这就实现了Shuffle的基本操作。
ShuffleBlockFetcherIterator的实现与流程
生成这个对象需要一系列参数:
参数名 | 参数类型 | 参数的意义和作用 |
---|---|---|
context | TaskContext | Task的上下文 |
shuffleClient | ShuffleClient | 读取Shuffle数据的Client |
blockManager | BlockManager | Block的管理器,可以获取Block的相关信息 |
blocksByAddress | Seq | 根据BlockManagerId聚合的Block列表 |
streamWrapper | 函数 | 用于将BlockId和对应的InputStream进行包装的方法 |
maxBytesInFlight | Long | —— |
maxReqsInFlight | Int | 同时进行请求发送的最大数 |
maxBlocksInFlightPerAddress | Int | 同时拉取某个远程地址的block的请求数 |
maxReqSizeShuffleToMem | Long | 单个请求拉取数据的可用缓存,如果超出这个缓存则使用临时文件来拉取数据 |
detectCorrupt | Boolean | — |
通过以上参数,生成了我们的ShuffleBlockFetcherIterator。ShuffleBlockFetcherIterator实现了Iterator(迭代器)接口,同时也实现了TempShuffleFileManager(临时Shuffle文件管理器)。实现Iterator就意味着它有迭代器的相关方法(next、hasNext);实现TempShuffleFileManager,就意味这它能够创建临时Shuffle文件(createTempShuffleFile)、能够注册临时文件的清理(registerTempShuffleFileToClean)。接下来我们分别按照接口的实现来分析,首先看看比较简单的TempShuffleFileManager:
TempShuffleFileManager的实现
实现这个接口,我们需要实现两个方法 createTempShuffleFile 和 registerTempShuffleFileToClean。
createTempShuffleFile
这个方法的实现很简单,就是调用BlockManager的DiskBlockManager的createTempLocalBlock来创建一个临时本地Block并返回。
registerTempShuffleFileToCliean
这个方法的实现也很简单,就是将文件添加到一个名为 shuffleFileSet 的集合中,等到ShuffleBlockFetcherIterator中cleanup方法被执行的时候,就会将 shuffleFileSet 集合中的文件统一删除。
Iterator的实现
我们知道,ShuffleBlockFetcherIterator 最重要的功能是获取数据,这个过程会涉及到从远程拉取数据。
hasNext
方法实现很简单,就是判断已经处理的block数量是否达到需要拉取的block的数量(numBlocksProcessed < numBlocksToFetch)。
next
next方法的实现就复杂了,而且涉及到很多方法之间的调用,我们首先看看流程图,总体说明一下流程,然后在具体分析细节。
这个流程图说明了 ShuffleBlockFetcherIterator 内部获取数据时几个方法之间的调用关系,next是对用户的接口,用户通过这个方法获取数据。但是我们都应该想到,shuffle数据是很庞大的,所以肯定不可能一次性都将数据拉取过来缓存,只能根据blockId逐步拉取,fetchUpToMaxBytes就是实现逐步拉取数据的逻辑,内部实现了对数据拉取的控制。当需要拉取数据的时候fetchUpToMaxBytes就会调用send方法,send方法是对sendRequest的一层封装,除了调用sendRequest之外,就是要记录每个远程地址当前正在传输Block的数量。sendRequest就是使用ShuffleClient从远程地址拉取数据的具体实现了。
为了便于理解,我们根据上面流程图的倒序进行分析。
sendRequest
sendRequest是远程拉取数据的实现。从代码上来看,代码主要分成三个部分:记录状态信息(为fetchUpToMaxByte提供逻辑处理的数据)、生成BlockFetchingListender(调用fetchBlocks的时候需要这个监听器作为回调)以及使用shuffleClient拉取数据。
基本的流程图如下:
对于ShuffleClient的fetchBlocks方法的实现我们稍后会深入分析,但是这里需要意识到的是这个方法拉取的是多个block。
send
send方法就是对sendRequest的包装,除了调用sendRequest方法之外,会记录当前远程地址正在传输Block的个数,也画一个图吧:
就是调用sendRequest,然后设置缓存的值。
fetchUpToMaxBytes
上面我们也简单的提了一下,这个方法就是读取最大限度的数据(但是要整读,不会读取半个block的数据,要么不读,要读取完成的block)。
这个方法的实现其实也比较简单,基本的思路就是在准备发送请求之前,先验证一下是否可以发送,如果可以则调用send方法,如果不可以,则将请求加入到延迟队列中。所以从上面的话中,大家可以了解到,这个数据会操作两个队列 fetchRequests 和 deferredFetchRequests,从代码来看,程序会先从deferredFetchRequest队列中拿请求处理,如果这个队列中没有请求或请求都不符合发送才会去fetchRequest队列中拿请求,实际处理刚开始的时候deferredFetchRequest必然是空的,只有当处理fetchRequest中的请求不符合发送条件的时候才会加入到deferredFetchRequest中,但是这两个队列的数据结构还是不一样的,fetchRequest存放的是FetchRequest(BlockManagerId, Seq[(BlockId, Long)])对象,也就是存储的是BlockManagerId->Block列表的映射关系,BlockManagerId可以简单的理解为存储Block的远程地址;而deferredFetchRequest,存放的是BlockManagerId -> Queue[FetchRequest]的映射,也就是说在添加到deferredFetchRequest的时候根据BlockManagerId进行整合汇总了。
此方法的流程图如下:
从图中,我们也看到了在这个方法对每个地址正在拉取block的数量(numBlocksInFlightPerAddress)、正在拉取数据的数据量(bytesInFlight)和正在拉取数据的请求个数(reqsInFlight)的使用。
在进入fetchUpToMaxBytes方法后,首先会遍历deferredFetchRequests集合中的数据,这里需要注意的是,这个数据根据BlockManagerId进行了汇总,key是BlockManagerId,value是一个FetchRequest的Queue。首先调用isRemoteBlockFetchable方法验证队列是否是可拉取的(主要判断当前最大同时数据拉取量和最大同时请求数)。然后使用isRemoteAddressMaxedOut方法验证要发送的请求是否达到了远程地址最大同时拉取数量。如果验证都符合条件,则将FetchRequest交给send方法来进行请求的发送。
next
next方法是Iterator接口的方法,也是用户用来获取数据的方法。
因为我们已经知道了sendRequest方法的BlockFetchingListener对象在收到数据拉取成功或失败后会向results(LinkedBlockingQueue)中写入一个SuccessFetchResult或FailedFetchResult,如果是SuccessFetchResult,则会在其中包含一个ManagerBuffer,也就是我们想要的数据。所以next方法获取数据就是从results中拉取数据,又因为results是阻塞队列,所以当results中没有数据的时候,就会被take方法所阻塞,直到拿到数据进行处理。
如果从results拿到的数据为FailedFetchResult,则会抛出异常信息,导致拉取失败。
如果从results拿到的数据为SuccessFetchResult,那么我们就可以进行处理了,首先,因为这个block已经拉取完毕了,所以相关计数器(如正在拉取数据的数据量bytesInFlight)的计数需要进行更新。
对相关计数器操作完成,就会从ManagerBuffer中获取输入流,然后调用serializerManager.wrapStream方法进行数据流的加密和压缩。接着会根据配置(数据流确实被加密或压缩了 、数据量较小)来决定是否要将输入流转换为ChunkedByteBufferInputStream。
最后,返回blockId以及使用BufferReleasingInputStream包装的输入流(包装后可以带调用ShuffleBlockFetcherIterator中的清理方法来进行清理工作)。
initialize
上面的介绍的这些方法都是在从远程节点拉取数据。其实ShuffleBlockFetcherIterator也会读取本地的数据,但是在生成ShuffleBlockFetcherIterator的时候会对Block进行拆分。而这一切是从initialize方法开始的。
方法的逻辑很简单,显示注册一个Task完成事件,用来调用cleanup方法进行清理工作。然后调用splitLocalRemoteBlocks()方法,将远程Block和本地Block进行拆分,并将拆分后的远程Block打散放入fetchRequest队列中(供fetchUpToMaxBytes()使用)。接着分别调用fetchUpToMaxBytes()和fetchLocalBlocks()方法。
splitLocalRemoteBlocks
方法的逻辑也相对简单,遍历blocksByAddress(类型为Seq[(BlockManagerId, Seq[(BlockId, Long)]],BlockManagerId代表了一个远程地址,这个地址上存折多个Block,这里以(BlockId,Block的size)来表示)(这里的blocksByAddress是通过mapOutputTracker(通过SparkEnv.get.mapOutputTracker得到)的getMapSizesByExecutorId方法得到的)中的数据。
遍历数据时,会拿到BlockManagerId和这个BlockManager上的Block列表。通过BlockManagerId,能够得到executorId,对这个executorId与本地SparkEnv中的BlockManager中的executor进行比较,就可以判断这个BlockManagerId是否是本地的了(BlockManager中的executorId就相当于BlockManager的唯一ID了)。如果是本地的,将Block列表加入到localBlocks集合中(对于size为0的Block直接丢掉,无需拉取),后面fetchLocalBlocks方法会对这个集合进行处理。对于远程,需要对远程BlockManager中的每个Block进行遍历,这样做的目的是为了拆分FetchRequest(可能会将一个BlockManager中的多个Block分不同的请求来拉取)。
从代码中对于FetchRequest的拆分,我们可以了解到在配置spark.reducer.maxSizeInFlight参数的时候,这个参数的最小值应该为一个Block的大小,否则后面没法发送请求,因为任何一个数据的大小都会超过限制,而FetchRequest的拆分是以Block为单位的,一个FetchRequest最少含有一个Block。
fetchLocalBlocks
fetchLocalBlocks的逻辑很简单,基本流程如下:
此方法就是遍历localBlocks中的数据,并调用本地的BlockManger获取数据,然后将包装为一个FetchResult放到results中,供next方法使用。
如果在获取本地Block时发生异常,则推送results时推送的是FailureFetchResult,否则推送的是SuccessFetchResult。
至此,ShuffleBlockFetcherIterator的处理逻辑就介绍完了。
shuffleWriter到shuffleReader的调用链
我们已经知道了BlockStoreShuffleReader作为ShuffleReader进行数据读取,read方法返回Iterator对象。那么调用这个read方法的流程是什么样的呢?简单的流程图如下:
基本流程就是在执行task的时候,会使用ShuffleWriter来写数据,写数据的时候就会调用RDD的iterator来读取数据。在使用iterator读取数据的时候会根据存储级别来确定调用getOrCompute()方法还是computeOrReadCheckpoint()方法,当RDD的存储级别不为NONE的时候就会调用getOrCompute()方法。但是这两个方法都会调用到computeOrReadCheckpoint()方法,然后调用ShuffleRDD的compute()方法,最终调用到了ShuffleReader的read()方法。所以接下来我们要看看ShuffleWriter的流程。
ShuffleWriter
ShuffleWriter的获取,ShuffleWriter是调用ShuffleManager(SparkEnv.get.shuffleManager)的getWriter方法得到的。调用getWriter方法的时候会传递dep.shuffleHandle作为参数,因为方法中会根据shuffleHandle的类型,生成不同类型的ShuffleWriter。
接下来,我们具体分析一下ShuffleWriter的write方法(以简单的SortShuffleWriter为例)。
SortShuffleWriter.write
此方法的逻辑也比较简单。首先生成ExternalSorter,根据dependency.mapSideCombine来确定生成的ExternalSorter是否含有聚合器(如果为true则含有)。
然后调用sorter.insertAll将read返回的Iterator插入到上面生成的ExternalSorter中。
接着,使用IndexShuffleBlockResolver(其中包含BlockManager,因此可以管理Block),根据shuffleId和partitionId创建shuffle的数据文件,同时创建这个数据文件的临时文件(在数据文件后面加一个随机的后缀)。
然后调用sorter.writePartitionedFile方法,将sorter中的数据写到上面的临时文件中。接着根据临时文件写索引文件,并将临时文件调整为正式文件,通过调用shuffleBlockResolver.writeIndexFileAndCommit方法来实现(此方法中会对数据文件和索引文件进行验证)。
write方法比较简单,对于分类器ExternalSorter,我们在ShuffleReader和ShuffleWriter中都看到了,shuffle过程数据的混洗也是这样完成的吧。
write方法中使用shuffleBlockResolver做了两件事:获取数据文件(getDataFile)和写索引文件(writeIndexFileAndCommit)。接下来我们对这两个方法也一并分析一下。
IndexShuffleBlockResolver.getDataFile
方法的实现很简单吧。就是调用BlockManager中的DiskBlockManager去获取文件,如果没有,则生成一个文件并返回(关于DiskBlockManager操作文件的逻辑可以参考内存分析章节)。
shuffleBlockResolver.writeIndexFileAndCommit
这个方法逻辑稍微复杂一些,先是根据shuffleId和partitionId创建索引文件和索引临时文件。然后根据sorter.insertAll方法返回的文件每个分区的长度,写到索引临时文件中。索引文件指定了数据文件中每个partition的起止offset(sorter.insertAll方法,我们有机会在深入分析)。到这里需要注意,刚刚写的数据文件和索引文件,都是写入到临时文件中的,因为数据文件和索引文件可能已经存在了(因为一个task可能有多个尝试在同时执行),所以接下来就会对索引文件和数据文件(注意这里不是临时文件)进行验证(调用checkIndexAndDataFile方法)。验证成功,则表示有task的其他尝试已经完成了文件的写入,直接将索引临时文件和数据临时文件删除即可;如果验证失败,则将已有的数据文件和索引文件删除,将我们上面生成的索引临时文件和数据临时文件重命名为正式的索引文件和数据文件。
缺少一个流程图。。。。。。TODO
至此,数据的写入逻辑也就介绍完了,另外两种Writer的write实现,有机会再补充。
ShuffleClient
在sendRequest方法中,我们调用了ShuffleClient对象来拉取Blocks。有个细节肯定没有忘记,就是会根据此次请求要拉取数据的大小来决定是否会传递TempShuffleFileManager,这个TempShuffleFileManager有什么用,我们会在分析的时候看到它的作用。另外,还有一个问题:ShuffleClient是如何得到的,具体的实现类是哪个?
从生成ShuffleBlockFetcherIterator是,我们知道ShuffleClient是调用BlockManager的shuffleClient方法得到的。
代码逻辑很简单,根据配置“spark.shuffle.service.enabled”来确定使用ExternalShuffleClient作为ShuffleClient,还是使用BlockTransferService。因为我们开启了此参数,所以我们会针对ExternalShuffleClient进行分析。生成ExternalShuffleClient的时候会需要一个SparkTransportConf
整个过程中所使用的配置
配置参数 | 默认值 | 参数作用 | 使用位置 |
---|---|---|---|
spark.reducer.maxSizeInFlight | 48M | 控制该节点可以同时拉取多大量的数据(所有请求同时拉取数据的字节数) | 1、isRemoteBlockFetchable方法中验证正在拉取的数据的数据量。2、在splitLocalRemoteBlocks方法中,用于拆分FetchRequest(当一个地址上的多个Block的总大小超过该值的五分之一的时候,就把这些Block拆到多个FetchRequest中)。 |
spark.reducer.maxReqsInFlight | Int.MaxValue | 控制节点同时可以发送的请求数,超过这个数的请求,将被放到deferredFetchRequests中 | isRemoteBlockFetchable方法中验证当前正在拉取数据的请求数 |
spark.reducer.maxBlocksInFlightPerAddress | Int.MaxValue | 一个远程节点,可以同时拉取Block的个数 | 1、isRemoteAddressMaxedOut方法中检查某个远程地址上正在拉取的block的数量是否超出最大值。2、在splitLocalRemoteBlocks方法中,当一个地址(BlockManagerId)上拥有的Block个数超过该值时,为了可以提交send方法,需要将这些Block拆分到多个FetchRequest中 |
spark.shuffle.detectCorrupt | true | 如果此参数为true,会将ManagerBuffer得到的输入流转换为ChunkedByteBufferInputStream类型 | 在shuffleBlockFetcherIterator的next方法中 |
spark.shuffle.compress | true | shuffle数据是否进行压缩 | 在next方法调用serializerManager.wrapStream方法时会验证 |
spark.shuffle.service.enabled | false | 是否启用shuffleService | 如果启用了,则生成ExternalShuffleClient对象作为ShuffleClient,否则使用BlockTransferService.我们开启了此参数 |
一些有趣的东西
去哪里可以得到shuffle数据的分布?
答案是MapOutputTracker,通过SparkEnv.get.mapOutputTracker就可以得到MapOutputTracker对象。
比如:mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition) 就可以得到shuffleId阶段,根据ExecutorId聚合后的Block信息。