spark_2.3.1_QuickStart

Quick Start

本指南快速的介绍如何使用Spark。我们将通过Spark的交互式shell(用Python或Scala)首先引入API,然后展示如何用Java、Scala和Python写application。
要遵循这个指南,首先需要从Spark的网站上下载Spark包。因为我们不使用HDFS,因此你可以现在任何版本的Hadoop。
注意,在Spark 2.0之前,Spark的主要程序接口是Resillent Distributed Dataset(RDD)。在Spark 2.0之后,RDD被Dataset所代替,Dataset类似于RDD的强类型,但是底层有更佳丰富的优化。RDD接口仍然被支持,你可以在RDD programming guide。然而,我们高度推荐你使用Dataset,它比RDD有更好的性能。查看SQL programming guide 以获取更多关于Dataset的详细信息。

Interactive Analysis with the Spak Shell

Basics

Spark的shell提供了简单的方式来学习API,以及一种强大的工具来交互式的分析数据。可以通过Scala(它运行在Java虚拟机上,因此它是学习已有Java库的很好方式)或Python来使用。通过在Spark目录下运行如下脚本来启动:

1
./bin/spark-shell

Spark的主要抽象是一个名为Dataset的分布式项目(数据条目–一条条的数据)集合。Dataset可以通过Hadoop InputFormates(如HDFS文件)来创建,或者由其他Dataset来转换。我们根据Spark源目录下README文件中的文本来创建一个新的Dataset:

1
2
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

通过调用一些action,你可以直接冲Dataset获取值,或者将这个Dataset转换为另一个新的Dataset。对于更多的细节,请查看API doc

1
2
3
4
5
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

现在,我们将这个Dataset转换为一个新的。我们调用filter,将会返回一个包含文件子集合的新的Dataset。

1
2
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

我们可以将转换和action串联在一起:

1
2
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

More on Dataset Operations

Dataset的转换和action可以被用于更加复杂的计算。假设我们要找出含有打你最多的一行:

1
2
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

它首先将一个行映射为一个数值,这创建了一个新的Dataset。reduce在Dataset上被调用,用来找到最大的数。map和reduce的参数是Scala的函数(闭包),也可以使用任何语言的特性或Scala/Java库。例如,我们在任意地方调用函数的声明(引入)。我们将使用Math.max()函数来使代码更加容易理解:

1
2
3
4
5
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一个常用的数据流是MapReduce。Spark能够很轻松的实现MapReduce流:

1
2
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

这里,我们调用flatMap将行的Dataset转换为一个单词的Dataset,接着利用groupbyKey和count的组合来计算每个单词在文件中出现的次数(String, Long对)从而生成一个新的Dataset。要在shell中收集单词的数量,我们可以调用collect:

1
2
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Pyhon,2), (agree,1), (cluster.,1), ...)

Caching

Spark还支持将数据集合缓存到集群端内存缓存中。这在数据被反复访问时非常有用,例如当查询一个非常热门的数据集时,又或是在运行一个类似PageRank这样的迭代算法时。作为一个简单的例子,我们将linesWithSpark数据进行缓存:

1
2
3
4
5
6
7
8
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15

使用Spark来分析并缓存一个100行的文本开起来很愚蠢。有意思的是,这些相同的函数可以被用在非常大的数据集上,即使它们跨越数十个甚至数百个节点。你可以通过连接bin/spark-shell到一个集群来进行交互式操作,就像RDD programming guide中描述的。

Self-Contained Applications

假设我们想要使用Spark API写一个自包含的application。我们将使用Scala(利用sbt)、Java(利用Maven)和Pyton(利用pip)来实现一个简单的application。
这里我们将使用Maven来构建一个application JAR,其他类似的构建系统也可以。
我们将创建一个非常简单的Spark application,SimpleApp.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}

这段代码用来计算Spark README文件中包含’a’的行数,和包含’b’的行数。注意你需要将YOUR_SPARK_HOME替换为Spark的安装位置。和之前使用Spark shell不同,Spark shell会初始化它自己的SparkSession,而在代码中初始化SparkSession是程序的一部分。
要构建这个程序,我们还需要写一个Maven的pom.xml文件,在这个文件中列出Spark的依赖。注意Spark的依赖和Scala的版本要对应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
</project>

我们根据规范列出了Maven的目录结构:

1
2
3
4
5
6
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在我们可以使用Maven进行打包,并使用./bin/spark-submit来执行它。

1
2
3
4
5
6
7
8
9
10
11
12
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

Where to Go from Here

恭喜你运行了自己的第一个Spark application!

对于API的更深了解,可以从RDD programming guideSQL programming guide或者查看 ‘Programming Guides’菜单来了解其他组件。
想要在集群上运行application,去deployment overview
最后,Spark在examples目录中包含了一些例子(Scala, Java, Python, R)。你可以如下运行它们:

1
2
3
4
5
6
7
8
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R