spark-2.11-storage

Spark存储管理在Spark执行application时担负这数据的传递、存储的重要职责。因此了解Spark的存储机制对于理解Spark的数据操作和性能的优化有着很重要的作用,本文以粗粒度方式来探究一下Spark的存储(最主要是内存)。由于在Spark中任务的执行、数据的传输均是发生在Executor端,因此本文只关注Executor端的存储操作。

1、内存模式 On-heap与Off-heap

因为Executor是运行在JVM上的,所以Executor最直接的就是操作On-heap内存,但是除此之外,Spark还引入了Off-heap内存的使用,使Spark可以直接操作JVM之外的系统内存,并对此操作进行了优化。

该图中关于task共享Executor内存的较少我们将在后续介绍,这里只需要关注On-heap和Off-heap的区别即可。

1.1、堆内内存 On-heap

堆内内存的大小通过参数 –executor-memory 或 配置spark.executor.memory来控制。Spark对堆内内存的管理是一种计数上的管理,因为对对象的创建和销毁实际是由JVM来具体操作的,Spark无法准确的控制这些,因此它只是从计数(空间使用量)的角度来管理堆内内存。

1.2、堆外内存 Off-heap

为了进一步优化内存的使用进而提高Shuffle的执行效率,Spark引入了堆外内存(Off-heap),使得Spark可以使用Executor上的系统内存。默认堆外内存是不可用的,可以通过 spark.memory.offHeap.enabled来进行开启。通过JDK Unsafe API,Spark能够直接操作系统内存,因此可以精确的控制堆外内存的申请和销毁。

1.3、内存的统一管理

Spark对于内存的管理使用统一的抽象接口MemoryManager。它负责Spark的内存申请、释放以及不同用途内存之间的转换。对于内存管理的实现,Spark主要基于两种内存管理:静态内存管理模式(StaticMemoryManager)和统一内存管理模式(unifledMemoryManager)。从Spark 2.0开始,默认使用的内存管理模式为 统一内存管理模式(unifledMemoryManager)。如果想要使用静态内存管理模式,可以将 spark.memory.useLegacyMode 配置设置为true。

2、内存的用途分类

从内存的使用来看,Spark对内存的使用主要在两个方面:数据执行和数据存储,所以我们堆内存的划分也就分为执行内存和存储内存。执行内存主要用于shuffle、join、sorts和aggregation的消耗,而存储内存主要用于数据的缓存和集群内数据的传输。在Spark内存管理中,执行内存和存储内存的大小是通过参数 spark.memory.storageFraction 来指定的,执行内存和存储内存之和就是Executor的可用内存(对于On-heap内存,含有other部分)。

2.1、堆内内存中执行内存与存储内存的计算

对于堆内内存,我们首先要介绍几个概念:

概念 解释/取值
系统内存 可以理解为JVM的内存,如果设置了spark.testing.memory,则使用,否则Runtime.getRuntime.maxMemory
系统预留内存 系统预留的内存,依次spark.testing.reservedMemory > spark.testing则为0 取值,默认为300(300 1024 1024)
最小系统内存 系统预留内存的1.5左右
可用内存 系统内存 - 预留系统内存
最大内存 可以被Spark使用的内存(执行内存和存储内存之和)。可用内存 * spark.memory.fraction(默认为0.6,我们集群的配置为0.7)

最大内存又分成了存储内存和执行内存。存储内存在最大内存中的占比通过spark.memory.storageFraction(默认为0.5)配置来指定,其余的为执行内存。这里额外介绍一个细节,如果 spark.executor.memory配置的值小于 最小系统内存,executor是无法启动的。

2.2、堆外内存中执行内存和存储内存的计算

堆外内存的大小是通过spark.memory.offHeap.size配置指定的。堆外内存比较简单,它不存在其他部分的内存分配,内部直接分为存储内存和执行内存。堆外存储内存所占比例同样通过配置spark.memory.storageFraction(默认0.5)来指定。

对于堆内存储内存和堆外存储内存,彼此之间是相互独立的,执行内存也是如此。

2.3、存储级别

我们已经知道了内存模式分为Off-heap和On-heap,而根据内存的用途有分为存储内存和执行内存。因此对于数据的存储,Spark从一下几个因素定义了存储级别(StorageLevel):

  1. 磁盘/内存
  2. 堆内/堆外
  3. 序列化/不序列化
  4. 有副本/没有副本

有了存储级别,就能够明确的说明数据存储的位置、数据存储的方式以及数据存储的个数。

3、静态内存管理与统一内存管理

最初Spark的采用的是静态内存管理,在2.0的版本中,Spark开始默认使用统一内存管理来进行内存管理。静态内存管理和统一内存管理的区别,可以简单的从执行内存和存储内存能否相互借用来区别。因为我们系统也是使用默认的统一内存管理来对内存进行管理,顾暂时不对静态内存管理进行理解。

3.1、统一内存管理

相对于静态内存,统一内存增加了动态占用机制的优化,其规则如下:

1、通过配置项spark.memory.storageFraction,对存储内存和执行内存进行基本值的划分。
2、当存储内存不够、执行内存充足时,可以增加存储内存的容量,减少执行内存的容量。反之亦然。
3、当存储不够且执行也不充足时,存储数据落盘。执行不够且存储也不够时,执行阻塞或失败。
4、当存储占用执行时,执行可要求存储归还,存储不够的可以落盘;当执行占用存储时,存储无法要求执行归还,只能删除数据或落盘。

3.2 动态占用机制的实现

上面我们提到了统一内存管理的动态占用机制,它可以更加充分的使用内存,那么这种机制是如何实现的呢?上面我们也说过,Spark其实是无法精确操作内存的,而是使用了类似计数管理的方式来实现的。
因此,在Spark的底层实现中,它为每种内存都创建了与之对应的内存池(执行内存池和存储内存池,但是存储模式又分为堆内和堆外,所以共有四种内存池),内存池记录了对应内存的使用量和容量。

3.2.1 MemoryManager

对于内存池的封装,是由 MemoryManager来实现,在其内部维持着四种内存池的引用。

其中只有相同内存模式的不同内存之间可以动态占用,如:OnHeapStorageMemoryPool只可以和 OnHeapExecutionMemoryPool 相互占用。另外需要注意的是,内存的总大小(执行内存和存储内存之和)一旦确定是无法修改的,虽然可以调整某个内存的大小,但是总的大小是不变的。
MemoryManager(UnifiedMemoryManager)主要的职责就是根据需要调整各自内存池的容量、计算各自内存池的当前使用量以及分配使用量。

4、存储内存的管理

存储内存最主要的使用就是数据缓存(RDD进行持久化保存)和集群内的数据传输(数据的广播)。而且我们前面也介绍了存储级别,还需要介绍一个其他的概念:Block。对于Block的理解,可以先简单的将数据的parition理解为一个Block,但是在存储过程中Block是由类型的(通过BlockId进行验证):
pimg_5c07c4739522a.png
从上图可以看出,BlockId由众多的子类,而属于哪种类别的BlockId,就是通过字符串模式匹配来决定的。
这里我们为什么要介绍Block呢?因为数据缓存就是以Block方式存储的。
在Spark中Storage模块负责Spark在计算过程中产生的数据,对数据的读写进行了统一的封装(包括从内存、磁盘、本地、远程)。在代码架构上,BlockManager分为Master和Salve。Dirver上运行的是Master,Executor上运行的是Slave,两者之间相同通信对数据块(Block)进行管理。

4.1、 具体的实现

在MemoryStore中,保持一个entries对象,它是一个LinkedHashMap[BlockId, MemoryEntry[_]]对象。MemoryEntry是一个接口,它有两个实现:DeserializedMemoryEntry 和 SerializedMemoryEntry,分别处理非序列化数据和序列化数据的保存。当由此,也就明白了存储级别(StorageLevel)中序列化和非序列化的意义了。当数据向内存中缓存数据时,其实就是将数据保存到enties中,但是与普通生成兑现不太一样,他会以连续的内存来保存,也就是说一个Block内的数据,从内存上来看是连续存储的(序列化的数据很好理解,序列化之后,对象就是一串字节数,但是对于非序列化的对象,其内部会有一个转换操作)。

5、执行内存的管理

执行内存最主要的使用就是shuffle、sorts、aggregate等操作的时候被使用。而排序和聚合其实都是以shuffle的结果来进行操作然后写出数据,所以我们先从Shuffle的存储进行分析。

5.1、 Shuffle执行内存的使用

shuffle操作是RDD之间的一种数据转换,从上一个RDD中读取,写入到下一个RDD中,因此我们将从读写两个方面来分析一下:

5.1.1、 shufflerReader

Spark的shuffle操作是由ShuffleManager(由子类SortShuffleManager进行实现)进行操作的。ShuffleManager要读取数据就需要获取Reader,从而得到BlockStoreShuffleReader,BlockStoreShuffleReader调用read()方法进行数据读取。ShuffleManager可以通过配置项spark.shuffle.manager进行设置(默认为sort,可选的值有sort和tungsten-sort):

spark.shuffle.manager的取值 所代表的类
sort org.apache.spark.shuffle.sort.SortShuffleManager
tungsten-sort org.apache.spark.shuffle.sort.SortShuffleManager

这里需要引入以概念:ShuffleClient,它是实际拉取数据的客户端。在Spark内部存在两种ShuffleClient:BlockTransferService和ExternalShuffleClient。如果配置项 spark.shuffle.service.enabled 为true(默认为false),则启用ExternalShuffleClient(比如我们的集群,就启用了这个配置)。
在生成ExternalShuffleClient的需要SparkTransportConf,该配置有两个比较重要的配置:

配置项 意义 取值
spark.shuffle.io.serverThreads stage之间TransServer的线程数 用户设定,默认与可用的core的数量相同
spark.shuffle.io.clientThreads 用户设定,默认与可用的core的数量相同

可用core的数量为:用户指定core数、运行时可用core数 以及 数字8 中最小的那个值(如果用户指定的数不是0,则使用用户指定的数和8中最小的值,否则就是可用core数和8中最小的那个)。我们集群没有对此进行配置,因此会使用JVM可用的core数进行设置,但是不会超过8个。
ExternalShuffleClient中重要的方法就是fetchBlock方法。在fetchBlock方法,会创建连接到目标host和port的TransportClient,然后利用这个client生成OneForOneBlockFetcher来拉取指定executor上(通过参数execId)指定的block(通过blockIds指定)。>_< 到这里都没有看到内存的使用。。。醉了
突然一个不小心,原来OneForOneBlockFetcher中使用了一个参数 TempShuffleFileManager,它是一个接口,实现类为 ShuffleBlockFetcherIterator。这个类中有一个方法 createTempShuffleFile()。那么我们就看看,OneForOneBlockFetcher 是否将数据写到了临时文件吧(山路十八弯呀)。通过跟踪代码,果然是将远程的数据写入到个临时文件中。但是当数据写完之后,这个文件会被用来生成一个ManagedBuffer(具体类为FileSegmentManagerBuffer),对于这个ManagedBuffer的操作会交给listener进行处理,这个linstener就又指向了ShuffleBlockFetcherIterator中的 BlockFetchingListener,调用它的onBlockFetchSuccess方法。在新的方法中,ManagerBuffer作为一个SuccessFetchResult对象被推送到results中(一个LinkedBlockingQueue队列)。我们已经知道这个方法是在 ShuffleBlockFetcherIteraotr中,而这个类本身就是Iterator,所以对上面的队列的读取,就发生在Iterator的next()方法中。继续回到生成ShuffleBlockFetcherIteraotr的地方BlockStoreShuffleReader.read()中。在read方法中,又继续对数据进行了处理,怎么处理的呢,当然从字节流被转换为对象(进行解序列化操作),但是read返回的依旧是一个迭代器(Iterator)。因为shuffle操作肯定对需要一种聚合手段,这里采用了ExternalAppendOnlyMap进行聚合操作。如果还需要排序,则使用进一步使用ExternalSorter对象进行操作。这两个类好复杂,慢慢在看(也就是这两对象的操作会占用内存)。

5.1.2、ShuffleWriter

shuffleWriter的调用是在ShuffleMapTask的runTask中触发的(这也很好理解,只要在执行task结束的时候才需要写数据呀),而且我们也知道,对于Task只分为两种类型ShuffleMapTask和ResultTask,因为是了解shuffle部分,所以我们只关注ShuffleMapTask,至于ResultTask以后再继续。
至于获取ShuffleWriter,是根据ShuffleDependency中shuffleHandle的类型所有决定的,不同的handler会生成不同的Writer:

handler类型 与之对应的writer
unsafeShuffleHandle UnsafeShuffleWriter
bypassMergeSortHandle BypassMergeSortShuffleWriter
其他 SortShuffleWriter

我们选择一个较为简单的Writer吧,就看SortShuffleWriter。对于Writer来说,最重要的方法必然是write。于是我们就在方法中看到了获取数据文件、生成BlockId、写文件的操作。
pimg_5c09e4c7151a3.png
从目前来看,shuffle的写操作,写的是文件,而非内存,但是从文档或其他人的文章都提到有写内存的,应该是我还没有看到,会后续补充。

5.2、task执行内存的分配

Executor内部是以多线程的方式执行task,要启动一个task其实就是将TaskRunner放到Executor内部的线程池中执行。既然,task是在Executor中运行,多task在运行期间,执行内存是如何分配的呢?Spark在执行内存池中维持了一个HashMap用来记录每个task所占用的内存。每个task允许使用的内存范围为 maxPoolSize/2N ~ maxPoolSize/N(N为当前活跃的Task数, maxPoolSize是执行内存池的最大空间),注意该限制只是在申请资源的时候验证,当申请资源的时候,如果可以分配给task的内存小于最小值,则会使申请资源的操作进入等待状态,等到有其他任务释放内存的时候,会被再次唤醒。
pimg_5c08aca616f57.png