Quick Start

本文是Spark 快速开始的翻译文档,会随着自己的实现进行更新

Spark shell的启动方式

spark提供了一种简单的方法来学习API,那就是Spark的shell。可以以Scala或python的方式来启动shell。
Scala的启动方式:

1
./bin/spark-shell

python的启动方式:

1
./bin/pyspark

spark的主要抽象是项目的分布式集合,被称为Resilient Distributed Database(RDD)。RDD能够根据Hadoop的输入格式(诸如HDFS文件)来创建,或者由其他RDD转换成为RDD。我们根据Spark源码中的README文件来创建一个新的RDD:

1
2
scala > val textFile = sc.textFile("README.md")
textFile:spark.RDD[String]= spark.MappedRDD@2ee9b6e3

RDD有行为和转换两种操作。行为可以返回值,转换将返回新的RDD的指针。下面以一个行为开始操作:

1
2
3
4
scala > textFile.count()// Number of items in this RDD
res0:Long=126
scala > textFile.first()// First item in this RDD
res1:String=#ApacheSpark

下面我们使用一个转换。我们将使用filter转换来获取以一个由文件子集创建的新的RDD:

1
2
scala > val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark:spark.RDD[String]= spark.FilteredRDD@7dd4af09

我们还可以将转换和行为连接在一起:

1
2
scala > textFile.filter(line => line.contains("Spark")).count()// How many lines contain "Spark"?
res3:Long=15

RDD上的更多操作

RDD行为和转换能够被用于更多的复杂操作:

1
2
scala > textFile.map(line => line.split(" ").size).reduce((a, b)=>if(a ]]> b) a else b)
res4:Long=15

第一行用来计算一行中被空格分割字数,创建一个新的RDD。reduce在这个RDD上被调用来计算拥有最大数。map和reduce是Scala的函数字面值,能够使用任何语言特征或Scala/Java库。例如,我们能够在任何决定使用的地方来调用函数。我们将使用Math.max()函数来写代码,使其变得容易理解。

1
2
3
4
5
scala > importjava.lang.Math
importjava.lang.Math
scala > textFile.map(line => line.split(" ").size).reduce((a, b)=>Math.max(a, b))
res5:Int=15

一种常用的数据流模式是MapReduce,在Hadoop中流行使用。Spark能够很容易的实现MapReduce:

1
2
scala > val wordCounts = textFile.flatMap(line => line.split(" ")).map(word =>(word,1)).reduceByKey((a, b)=> a + b)
wordCounts:spark.RDD[(String, Int)]= spark.ShuffledAggregatedRDD@71f027b8

这里,我们组合了flatMap, map和reducebyKey进行转换将每个单词作为一个RDD对(String, Int)来计算每个单词在文件中出现的次数。要在我们的shell中收集单词的数量,我们可以使用收集行为:

1
2
scala]]> wordCounts.collect()
res6:Array[(String, Int)]=Array((means,1),(under,2),(this,3),(Because,1),(Python,2),(agree,1),(cluster.,1),...)

##Caching
Spark同样支持将数据集推送到一个集群范围的内存缓存中。当数据反复被访问的时候,这样很有用,例如我们查询一个非常小的“热门”的数据集或当运行一个像PageRank这样的反复迭代算法时。作为一个例子,我们将我们的linesWithSpark数据集进行缓存:

1
2
3
4
5
6
scala > linesWithSpark.cache()
res7:spark.RDD[String]= spark.FilteredRDD@17e51082
scala > linesWithSpark.count()
res8:Long=19
scala > linesWithSpark.count()
res9:Long=19

使用Spark来研究并缓存一个100行的文本文件看起来很傻。有趣的部分是这些相同的函数可以被用到非常大的数据集上,甚至是跨越数十个或数百个节点的数据集。你还可以通过连接bin/spark-shell到一个集群来交互式的这样做,在programming guide中进行了描述。

Self-Contained Applications(独立的应用程序)

假设我们希望使用Spark API来写一个独立的应用程序。
我们将创建一个非常简单的Spark应用程序,它的名字为SimpleApp.scala。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp{
def main(args:Array[String]){
val logFile ="YOUR_SPARK_HOME/README.md"// Should be some file on your system
val conf =new SparkConf().setAppName("Simple Application")
val sc =new SparkContext(conf)
val logData = sc.textFile(logFile,2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}

注意,这个应用程序应该定义一个main()方法,而不是要继承scala.App。scala.App将不能正确的工作。
这段代码只是计算Spark README文件中包含了“a”的行数和包含了“b”的行数。注意你需要使用本地安装spark的位置来替换你的Spark的home。和之前在Spark shell中不一样的是初始化SparkContext是代码的一部分,Spark shell会帮我们自动生成SparkContext。
我们传递一个SparkConf给SparkContext的构造器,SparkConf中包含了我们应用程序的信息。
我们的应用程序依赖于Spark API,因此我们还需要包含sbt的配置文件,simple.sbt,它解释了Spark的依赖。这个文件同样添加了一个Spark依赖的库。

1
2
3
4
name :="Simple Project"
version :="1.0"
scalaVersion :="2.10.5"
libraryDependencies +="org.apache.spark"%%"spark-core"%"1.6.1"

为了让sbt正确工作,我们需要对SimpleApp.scala和simple.sbt进行目录结构的布局。一旦安排好,我们能够创建包含应用程序代码的JAR包,然后使用spark-submit脚本来运行我们的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp"\
--master local[4]\
target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23

Where to Go from Here

恭喜你,你已经执行了第一个Spark 应用程序。
1、要查看API的更深使用,可以在Spark programming guide,或查看Programming Guides菜单来查看其他组件。
2、要在一个集群上进行运行,可以查看deployment overview。
3、最后,Spark包含了很多例子在examples目录(有Scala、Java、Python和R语言的支持)。你可以如下运行他们:
``` bash

#For Scala and Java, use run-example:
./bin/run-example SparkPi

#For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

#For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
``