Spark Resource Study

Core

Partition 分区

在Partition特征中,只定义了index,用来表示分区在父级RDD中索引。

Dependency 依赖

依赖的定义在 org.apache.spark.Dependency类中,从代码中可以看出,Dependency有2个子类,分别代表了2中类型的依赖,分别为:NarrowDependency和ShuffleDependency。其中NarrowDependency有两个子类:OneToOneDependency和RangeDependency。

RDD

RDD是Resilient Distributed Dataset的简称,是Spark中的基本抽象。要实例化一个RDD,需要两个参数:SparkContext和Dependency列表。需要SparkContext是因为SparkContext提供了RDD的一些操作(如生成RDD的id,清理RDD的缓存、缓存RDD等),而Dependency则是因为它表示了RDD的继承关系。

1
2
3
4
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {

此处可以看到RDD的基本构造方法。

1
2
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))

使用已有的RDD构造新的RDD。

此外,RDD定义了一些抽象,需要子类进行实现:

1
2
3
4
5
def compute(split: Partition, context: TaskContext): Iterator[T] -- 计算给定的分区,返回一个迭代器
protected def getPartitions: Array[Partition] -- 返回RDD的所有分区
protected def getDependencies: Seq[Dependency[_]] = deps -- 返回RDD的到父类RDD的所有依赖

SparkContext

wordCount分析

了解了一些代码之后,决定从wordCount的案例进行分析,以便了解Spark进行计算时的具体操作。

1
2
val sc = new SparkContext("master", "testApplication");
sc.hadoopFile("path", 5).map(_ => 1).count()

因为在Spark中,transform是延迟执行的,也就是说,action之前的transform只有在遇到后面的action之后,才开始执行。因此上面的代码就要从count()开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
//Utils.getIteratorSize方法,在之后的代码块中展示,其实现就是根据参数的Iterator,计算一下这个迭代器中的数据个数(猜测迭代器最终是RDD分区的迭代器)
//这里的runJob的定义是
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
//然后又指向
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
//然后继续指向
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
//继续指向
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = {
...
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

最终执行Job落到了 dagScheduler 对象身上

1
2
3
4
5
def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
...
}

dagScheduler 的runJob方法中调用submitJob来提交任务

1
2
3
4
5
6
def submitJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties):JobWaiter[U] = {
...
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
waiter
}

最终,通过eventProcessLoop的post将任务提交到了任务执行队列。 这里需要注意的一个问题,加入任务队列的是一个 JobSubmitted对象。为什么要如此处理呢?需要从eventProcessLoop对象入手。
eventProcessLoop是DAGSchedulerEventProcessLoop的实例

1
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

查看 DAGSchedulerEventProcessLoop 的定义

1
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging

DAGSchedulerEventProcessLoop继承于 EventLoop,EventLoop的内部有一个EventThread的线程,该线程从事件队列中循环获取数据

1
2
3
4
5
6
7
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
} catch {
}
}

DAGSchedulerEventProcessLoop的doOnReceive方法的定义如下

1
2
3
4
5
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
...
}

当从事件队列中获取到数据后,如果JobSubmitted对象,则调用dagScheduler的handleJobSubmitted方法。由此也知道了为什么eventProcessLoop.post()推的数据是 JobSubmitted 对象了。

再看handleJobSubmitted方法的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}

由此看到了创建步骤(createResultStage(finalRDD, func, partitions, jobId, callSite))和提交步骤(submitStage(finalStage))的代码。

???上面的代码分析过程中,我们知道整个transform的触发点是从action(count())开始的,而count是最后一个RDD(map生成的那个RDD)的方法。map对应的RDD是MapPartitionsRDD。在MapPartitionsRDD的compute方法中,而compute方法中使用的迭代器是从最开始的那个RDD开始的( firstParent[T].iterator(split, context) )。

创建步骤 createResultStage(finalRDD, func, partitions, jobId, callSite)