Core
Partition 分区
在Partition特征中,只定义了index,用来表示分区在父级RDD中索引。
Dependency 依赖
依赖的定义在 org.apache.spark.Dependency类中,从代码中可以看出,Dependency有2个子类,分别代表了2中类型的依赖,分别为:NarrowDependency和ShuffleDependency。其中NarrowDependency有两个子类:OneToOneDependency和RangeDependency。
RDD
RDD是Resilient Distributed Dataset的简称,是Spark中的基本抽象。要实例化一个RDD,需要两个参数:SparkContext和Dependency列表。需要SparkContext是因为SparkContext提供了RDD的一些操作(如生成RDD的id,清理RDD的缓存、缓存RDD等),而Dependency则是因为它表示了RDD的继承关系。
此处可以看到RDD的基本构造方法。
|
|
使用已有的RDD构造新的RDD。
此外,RDD定义了一些抽象,需要子类进行实现:
SparkContext
wordCount分析
了解了一些代码之后,决定从wordCount的案例进行分析,以便了解Spark进行计算时的具体操作。
因为在Spark中,transform是延迟执行的,也就是说,action之前的transform只有在遇到后面的action之后,才开始执行。因此上面的代码就要从count()开始。
|
|
最终执行Job落到了 dagScheduler 对象身上
dagScheduler 的runJob方法中调用submitJob来提交任务
|
|
最终,通过eventProcessLoop的post将任务提交到了任务执行队列。 这里需要注意的一个问题,加入任务队列的是一个 JobSubmitted对象。为什么要如此处理呢?需要从eventProcessLoop对象入手。
eventProcessLoop是DAGSchedulerEventProcessLoop的实例
查看 DAGSchedulerEventProcessLoop 的定义
DAGSchedulerEventProcessLoop继承于 EventLoop,EventLoop的内部有一个EventThread的线程,该线程从事件队列中循环获取数据
DAGSchedulerEventProcessLoop的doOnReceive方法的定义如下
当从事件队列中获取到数据后,如果JobSubmitted对象,则调用dagScheduler的handleJobSubmitted方法。由此也知道了为什么eventProcessLoop.post()推的数据是 JobSubmitted 对象了。
再看handleJobSubmitted方法的定义:
由此看到了创建步骤(createResultStage(finalRDD, func, partitions, jobId, callSite))和提交步骤(submitStage(finalStage))的代码。
???上面的代码分析过程中,我们知道整个transform的触发点是从action(count())开始的,而count是最后一个RDD(map生成的那个RDD)的方法。map对应的RDD是MapPartitionsRDD。在MapPartitionsRDD的compute方法中,而compute方法中使用的迭代器是从最开始的那个RDD开始的( firstParent[T].iterator(split, context) )。
创建步骤 createResultStage(finalRDD, func, partitions, jobId, callSite)