Spark 2.3.1 Spark SQL DataFrames and DatasetsGuide

Spark SQL, DataFrames and Dataset Guide

Overview

Spark SQL是一个用于结构化数据处理的Spark模块。与Spark RDD API不同,由Spark SQL提供的这些接口在结构化数据和结构化计算执行方面提供了更多信息。在内部,Spark SQL使用了这个额外信息来执行额外的优化。有几种与Spark SQL交互的方法,包括SQL和Dataset API。当计算一个结果时,相同的计算引擎会被使用,与你执行计算使用的API/语言无关。这种统一意味着开发者能够轻松在那些提供更加原始的方式处理给定转换的不同API之间进行来回切换。
本篇中所有例子使用的样例数据包含在Spark中,并能够使用spark-shell、pyspark shell或sparkR shell来运行。

SQL

Spark SQL的一种用法时执行SQL查询。Spark SQL还能够被用来从Hive实例中读取数据。关于如何配置这个特性,请参考Hive Tables。当在另一种编程语言中执行SQL时,结果会作为一个Dataset/DataFrame来返回。你还能够使用command-lineJDBC/ODBC的方式与SQL接口进行交互。

Datasets and DataFrames

一个Dataset就是一个分布式数据集。Dataset作为一个新接口在Spark 1.6中被添加,它提供了RDD的优点(强类型、能够使用强大的lambda函数)和Spark SQL的优化执行引擎的有点。一个Dataset能够根据JVM对象来构造,然后使用函数转换(map、flatMap、filter)进行操作。Dataset的API在Scala和Java中时可用的。Python还不支持Dataset API。但是因为Python的动态特性,Dataset API的很多优点已经可用了(例如你可以很自然的通过名称来访问某一行的一个字段 row.columnName)。对于R语言也是如此。
一个DataFrame是一个带有列名的数据集。它在概念上等同于关系数据库中的一个表或者一个是R语言或Python语言中data frame,但是底层具更加优化。DataFrame可以根据各种资源进行构建,例如:结构化的数据文件、Hive中的表、外部数据库以及已经存在的RDD。DataFrame API在Scala、Java、Python和R语言中都可用。在Scala和Java中,一个DataFrame相当于一个有很多行的Dataset。在Scala API中,DataFrame相当于一个Dataset[Row]类型。而在Java API中,用户需要使用Dataset来表述一个DataFrame。
在本文中,我们将经常引用Scala/Java由有Row组成的Dataset来表述DataFrame。

Getting Started

Starting Point: SparkSession

Spark中,所有功能的切入点是SparkSession类。要创建一个基本的SparkSession,只需要使用SparkSession.builder()

1
2
3
4
5
6
7
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();

在Spark库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”目录下,查看完整的示例代码。
SparkSession是Spark 2.0的内置功能,用于提供Hive特性,包括用来写HiveQL查询、
访问Hive UDFs已经从Hive表中读取数据。要使用这些特性,你不需要配置Hive。

Creating DataFrames

使用SparkSession,application能够从一个已经存在的RDD、一个Hive表或Spark data sources来创建DataFrame。
作为一个例子,下面的代码根据一个JSON文件中的内容来创建一个DataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

完整的代码,请查看Spark库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”。

Untyped Dataset Operations(aka DataFrame Operations)

在Scala、Java、Python和R语言中,DataFrames针对不同的语言提供不同的结构化数据操作。正如上面提到的,在Spark2.0中,在Scala和Java的API中,DataFrames是以Dataset来表述的。这些操作也被称为“无类型转换”,与强类型转换的Scala/Java Dataset的类型形成对比。
这里,我们展示了使用Dataset进行结构化数据处理的基本示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+

完整的样例代码,查看Spark库的examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java。
在Dataset上能够执行的操作类型列表,可以查看API Document
除了简单的列引用和计算外,Dataset还有一个丰富的函数库,包括字符串的操作、日期的计算以及常用的数学操作等。完整的列表可以在DataFrame Function Reference找到。

Running SQL Queries Programmatically

SparkSession上的sql函数使application能够执行SQL查询,并返回一个Dataset作为结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

完整的代码,请查看Spark库中的 examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 。

Global Temporary View

在Spark SQL中,临时视图是session范围的,将会伴随着创建它的那个session的终止而消失。如果你想要跨session共享一个临时视图,并让它存活到application终止,你可以创建一个全局临时视图。全局视图与一个名为‘global_temp’的由系统保护的数据库进行绑定,我们必须使用这个特殊的名字来引用它,如:SELECT * FROM global_temp.view1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

完整的代码,请查看“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”。

Creating Datasets

Dataset与RDD类似,不同的是它没有使用Java序列化或Kryo,它们使用了一个特殊的Encoder来序列化对象,以便这些对象的处理或跨网络传输。虽然encoder和标准序列化器都能够将一个对象转换为字节,encoder是动态编码产生的,并且使用一种格式来允许Spark执行很多操作(filtering, sorting 和 hashing),而不需要讲字节反编译为对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);
// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
(MapFunction<Integer, Integer>) value -> value + 1,
integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

完整的示例,请查看 examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 。

Interoperating with RDDs

Spark SQL支持两种不同方法来将存在的RDD转换为Dataset。第一种方法是使用反射来推导包含特殊类型对象的RDD的模式。这种反射的方法代码更加简单,而且如果在你写Spark application时已经知道了模式时,工作的会很好。
第二种方法是通过一个程序接口来创建Dataset,这个程序接口允许你构建一个模式,并且将它应用到一个已经存在的RDD上。但是这个方法比较冗长,它允许你只有在运行时才知道列和列类型时来构造Dataset。

Inferring the Schema Using Reflection

Spark SQL支持自动将一个JavaBean的RDD转换为一个DataFrame。BeanInfo使用反射机制获得,定义了表的模式。当前,Spark SQL不支持那些包含了Map类型字段的JavaBean,但是对于嵌套的JavaBean以及嵌套了List或Array类型的字段给予了充分的支持。你可以通过创建一个实现了Serializable接口以及为所有字段生成getter和setter方法的类来创建一个JavaBean。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

完整的代码,请查看 examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 。

Programmatically Specifying the Schema

当JavaBean无法提前定义时(例如,记录的结构被编码为一个字符串,或者一个文本数据集将被解析,但是其中的字段可能根据不同的用户而不一样),Dataset能够通过三个步骤来创建。

1、根据原生的RDD创建一个RDD
2、创建一个与第一步骤RDD中Row结构匹配的StructType来描述的模式。
3、通过由SparkSession提供的createDataFrame方法,将这个模式应用到RDD

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("examples/src/main/resources/people.txt", 1)
.toJavaRDD();
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+

完整的示例,请查看 examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 。

Aggregations

内置的DataFrame函数提供了常用的聚合操作,如count()、countDistinct()、avg()、max()、min()等。然而这些函数是为了DataFrame设计的,Spark SQL同样由类型安全的版本,以便其中一些被用到Scala和Java的强类型Dataset。此外,Spark没有限制用户预定义聚合函数,可以自己来创建聚合函数。

Untyped User-Defined Aggregate Functions

用户要实现无类型聚合函数,则需要继承UserDefinedAggregateFunction抽象类。例如,你一个用户自定义的平均数函数,看起来像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public static class MyAverage extends UserDefinedAggregateFunction {
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
// Data types of input arguments of this aggregate function
public StructType inputSchema() {
return inputSchema;
}
// Data types of values in the aggregation buffer
public StructType bufferSchema() {
return bufferSchema;
}
// The data type of the returned value
public DataType dataType() {
return DataTypes.DoubleType;
}
// Whether this function always returns the same output on the identical input
public boolean deterministic() {
return true;
}
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0L);
buffer.update(1, 0L);
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
// Calculates the final result
public Double evaluate(Row buffer) {
return ((double) buffer.getLong(0)) / buffer.getLong(1);
}
}
// Register the function to access it
spark.udf().register("myAverage", new MyAverage());
Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+

查看完整示例,请参考 examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedUntypedAggregation.java 。

Type-Safe User-Defined Aggregate Functions

强类型Dataset的用户自定义聚合围绕着Aggregator抽象类来解决。例如,一个类型安全的用户自定义平均数看起来是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
public static class Employee implements Serializable {
private String name;
private long salary;
// Constructors, getters, setters...
}
public static class Average implements Serializable {
private long sum;
private long count;
// Constructors, getters, setters...
}
public static class MyAverage extends Aggregator<Employee, Average, Double> {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
public Average zero() {
return new Average(0L, 0L);
}
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
public Average reduce(Average buffer, Employee employee) {
long newSum = buffer.getSum() + employee.getSalary();
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}
// Merge two intermediate values
public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}
// Transform the output of the reduction
public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}
// Specifies the Encoder for the intermediate value type
public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}
// Specifies the Encoder for the final output value type
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+

完整的示例,请看 examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java 。

Data Sources

Spark SQL通过DataFrame接口支持多种数据源的操作。DataFrame能够使用关系转换进行操作,也可以被用来创建一个临时视图。将DataFrame注册为一个临时视图,将允许你在视图的数据上运行SQL查询。这一章节描述了使用Spark Data Sources加载和保存数据的一般方法,然后介绍内置数据源可用的详细参数。

Generic Load/Save Functions

最简单的格式,默认数据源(默认是parquet, 除非通过spark.sql.soiurces.default配置修改过)将被用于所有操作。

1
2
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

查看完整示例,请参考 examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java 。

Manually Specifying Options

你还可以手动指定想要使用的数据源,以及传递给数据源任何额外的参数。数据源可以通过它的完整限定名(如:org.apache.spark.sql.parquet)来指定,但是对于内置的数据源,你也能够使用它的短名字(json、parquet、jdbc、orc、libsvm、csv、text)。从任何类型数据源加载的DataFrames,通过使用这个语句都可以转为其他类型。
要加载一个JSON文件,你可以使用:

1
2
Dataset<Row> peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

查看完整示例,请参考:xamples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。
要加载一个CSV文件,你可以使用:

1
2
3
4
5
Dataset<Row> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");

查看完整示例,请参考:xamples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Run SQL on files directly

除了使用read API加载文件到DataFrame然后查询它之外,你还可以使用SQL直接查询那个文件。

1
2
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

查看完整示例,请参考:xamples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Save Modes

保存操作可以选择一种SaveMode,它指定了如何处理存在的数据。一件非常重要的事情是这些保存模式没有利用任何锁,并且它们不是原子操作。另外,当执行Overwrite模式时,已有的数据将会在写出新数据之前被删掉。

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) “error” or “errorifexists” (default) 当保存一个DataFrame到一个数据源时,如果数据已经存在,预计将抛出一个异常
SaveMode.Append “append” 当保存一个DataFrame到一个数据源时,如果数据或表格已经存在,DataFrame的内容将被追加到已存在数据
SaveMode.Overwrite “overwrite” Overwrite模式意味着,当保存一个DataFrame到一个数据源时,如果数据或表格已经存在,已存在的数据将会被DataFrame的内容所覆盖
SaveMode.Ignore “ignore” Ignore模式意味着当保存一个DataFrame到一个数据源时,如果数据已经存在,保存操作将不会保存DataFrame的内容,并且不会修改已经存在的数据。这个操作类似 CREATE TABLE IF NOT EXISTS

Saving to Persistent Tables

使用saveAsTable命令,DataFrames也可以作为持久化表被保存到Hive metastore中。注意,使用这个功能不需要现有Hive的部署。Spark将会为你创建一个默认的本地Hive metastore(使用Derby)。与createOrReplaceTempView命令不同,saveAsTable将显示DataFrame的内容并创建一个指向Hive metastore中数据的指针。持久化表将在你的Spark程序重启之后持续存在,只要你维持你的连接在相同的metastore。通过在SparkSession上调用table方法(并传递表的名字),就能根据持久化表创建对应的DataFrame。
对于基于文件的数据源,如:text、parquet、json等。通过path选项,你可以指定一个自定义表路径,如:df.write.option(“path”, “/some/path”).saveAsTable(“t”)。当这个表被删除,自定义表路径将不会被移除,并且表数据依然存在。如果没有指定自定义表路径,Spark将会把数据写到仓库目录下的默认表路径。当这个表被删除时,默认表路径也会一并被删除。
从Spark2.1开始,持久化数据源表格在Hive metastore中有独立的元数据。这样做又一些优点:

因为metastore只返回查询所需的partition,因此表上的首次查询就不需要查找所有的aprtition。
Hive DDL(如ALTER TABLE PARTITION … SET LOCATION),对于使用Datasource APi来创建表都是可用的。

注意,当创建外部数据源表时(那些带有path选项的),分区信息默认是不会被收集的。要同步分区信息到metastore中,你可以执行MSCK REPAIR TABLE。

Bucketing, Sorting and Partitioning

对于基于文件的数据源,还可以对输出进行分组并排序或分组并分区。分组并排序只对持久化表适用:

1
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");

完整的代码,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。
当使用Dataset API时,partitioning能够和save以及saveAsTable一起使用。

1
2
3
4
5
usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");

完整的代码,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。
可以对单个表使用partitioning和bucketing:

1
2
3
4
5
peopleDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed");

完整的代码,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。
partitionBy创建了一个在Partition Discovery章节中描述的目录结构。因此,它对具有高基数的列的适用性有限。相比之下,BucketBy会跨固定数量的bucket来分布部署数据,and can be used when a number of unique values is unbounded.(!!!无法理解)

Parquet Files

Parquet时一种列式文件格式,它被很多其他数据处理系统所支持。Spark SQL对Parquet文件提供了读写支持,并能够自动保护原始数据的模式。当写Parquet文件时,为了兼容的原因,所有列被自动转换为nullable。

Loading Data Programmatically

使用上面例子中的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");
// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

完整示例,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Partition Discovery

在像Hive这样的系统中,常用的优化方法时进行表分区。在分区表中,数据通常存储在不同的目录中,根据分区列的值,编码到每个分区目录的路径中。所有内置文件源(包括Text/CSV/JSON/ORC/Parquet)都能够自动发现并推断分区信息。例如,我们能够将我们之前使用的数据存储到如下目录结构的分区表中,这个分区表使用两个额外的字段gender和country来作为分区字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...

通过将path/to/table传递给SparkSession.read.parquet或SparkSession.read.load,Spark SQL将自动从路径中获取分区信息。现在返回的DataFrame的模式变成:

1
2
3
4
5
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

注意,分区列的数据类型是自动推断的。当前支持数字数据类型、日期、时间戳和字符串类型。有些时候,用户可能不想自动推导分区列的数据类型。对于这种情况,自动类型推导能够通过配置项spark.sql.sources.partitionColumnTypeInference.enabled来配置,该配置默认值为True。当类型推导被禁用后,分区列将使用字符串类型。
从Spark1.6开始,分区发现默认只能查找给定路径下的。因此,对于上面的那个例子,如果用户传递path/to/table/gender=male给SparkSession.read.parquet或SparkSession.read.load,那么gender将不会被当成一个分区列。如果用户想要具体说明分区开始查找的基本目录,可以在数据源选项中设置basePath。例如,当数据目录为path/to/table/gender=male时,并且设置了basePath为path/to/table/,那么gender将会是一个分区列。

Schema Merging

和ProtocolBuffer、Avro以及Thrift一样,Parquet也支持模式演化。用户可以先从一个简单的schema开始,然后根据需要逐渐增加更多的列。通过这种方式,用户可能最终会得到不同但相互兼容的多个Parquet文件。Parquet数据源能够自动发现这种情况,并合并这些文件的schemas。
因为合并schema是一个成本相当高的操作,而且在很多情况是不必要的,因此从1.5.0开始,该功能默认是关闭的。你可以通过以下来启用它:

当你读区Parquet文件时,设置数据源选项 mergeSchema为true(下面的列子将展示)或者
设置全局SQL选项 spark.sql.parquet.mergeSchema为true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public static class Square implements Serializable {
private int value;
private int square;
// Getters and setters...
}
public static class Cube implements Serializable {
private int value;
private int cube;
// Getters and setters...
}
List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
Square square = new Square();
square.setValue(value);
square.setSquare(value * value);
squares.add(square);
}
// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");
List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
Cube cube = new Cube();
cube.setValue(value);
cube.setCube(value * value * value);
cubes.add(cube);
}
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");
// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)

完整示例,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Hive metastore Parquet table conversion

当我们向Hive metastore Parquet table写数据或从中读数据时,Spark SQL奖尝试使用自己的Parquet支持来代替Hive SerDe以获取更好的性能。这个行为通过spark.sql.hive.converMetastoreParquet来配置,并且默认为打开的。

Hive/Parquet Schema Reconciliation

从表schema处理的角度来看,Hive和Parquet有两个主要区别:

1、Hive是不区分大小写的,而Parquet是区分大小写的。
2、Hive认为所有列nullable,而nullable在Parquet中很重要。

因为上面的原因,当我们将一个Hive metastore Parquet table转换为一个Spark SQL Parquet table时,我们必须将Hive metastore schema与Parquet schema调整一致。调整的规则为:

1、两个schema中相同名称的字段不管是否为空必须具有相同的数据类型。调整好的字段应当具有Parquet端的数据类型,因此nullable是具有意义的。
2、调整后的schema必须包含Hive metastore schema中定义的字段。
1)只出现在Parquet schema中的字段将从调整后的schema中删掉。
2)只出现在Hive metastore schema中的字段将被作为nullable字段添加到调整后的schema中。

Metadata Refreshing

Spark SQL为了更好的性能而缓存了Parquet metadata。当Hive metastore Parquet表转换启用时,那些被转换的表的metadata也会被缓存。如果这些表被Hive或其他外部工具更新了,你需要手动刷新它们以保证metadata的一致。

1
2
// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");

Configuration

Parquet的配置可以通过两种方式完成,在SparkSession上使用setConf方法或使用SQL运行SET key=value。

Property Name Default Meaning
spark.sql.parquet.binaryAsString false 一些其他产生Parquet的系统,主要是Impala、Hive以及老版本的Spark SQL,这些系统在写Parquet schema时不区分二进制数据和字符串。这个标记告诉Spark SQL为这些系统将二进制数据按照字符串来进行兼容。
spark.sql.parquet.int96AsTimestamp true 一些其他产生Parquet的系统,特别是Impala和Hive,它们使用INT96来存储时间戳。这个标记告诉Spark SQL将INT96按照时间戳来解析,以便为那些系统提供兼容。
spark.sql.parquet.compressio.codec snappy 设置写Parquet文件的压缩编码器。如果没有在表详情的选项/属性中指定”compression”或”parquet.compression”。根据优先级排序:compression > parquet.compression > spark.sql.parquet.compression.codec。该选项可以使用的值有:none、uncompressed、snappy、gzip或lzo。
spark.sql.parquet.filterPushdown. true 当设置为True时,启用Parquet过滤器的push-down优化。
spark.sql.hive.converMetastoreParquet true 当设置为false时,Spark SQL将对parquet table使用Hive SerDe,而不是使用内置支持。
spark.sql.parquet.mergeSchema false 当设置为true时,Parquet数据源合并从所有数据文件收集的schema,如果是false,将从摘要文件中挑选schema,如果没有摘要文件可用,则随机选择一个文件。
spark.sql.optimizer.metadataOnly. true 当设置为true时,启用metadata-only查询优化,这个优化使用表的metadata来产生分区列,而不是通过对表扫描。当所有扫描过的列示分区列,且查询操作有一个满足distinct语意的聚合操作时,适用。

ORC Files

从Spark 2.3开始,Spark支持向量ORC reader,这个reader使用新的ORC文件格式来读取ORC文件。因此新增了如下配置。当spark.sql.orc.impl被设置为native且spark.sql.orc.enableVectorizedReader被设置为true时,向量读取器将用于读区原生的ORC表(这些表使用USING ORC语句创建)。对于Hive ORC serde表(使用USING HIVE OPTIONS),当spark.sql.hive.convertMetastoreOrc也被设置为true时,向量reader被使用。

Property Name Default Meaning
spark.sql.orc.impl hive ORC实现类的名字。可以是native和hive中的一个。native意味着对构建于Apache ORC 1.4.1上的原生ORC支持。hive意味着对Hive 1.2.1中的ORC库进行支持。
spark.slql.orc.enableVectorizedReader true 在native实现中启用向量化orc编码。如果为false,一个新的非向量化ORC reader被用于native实现。对于hive实现,本项可以忽略。

JSON Datasets

Spark SQL能够自动推导一个JSON dataset的schema并将它加载为一个Dataset。这个转换能够在一个Dataset上或一个JSON文件上使用SparkSession.read().json()来完成。
注意,提供的json文件不是一个典型的JSON文件。每一行必须是一个独立有效的JSON对象(其实这句话的意思就是,一个json数据必须独立一行,不能跨多行)。关于更多的信息,请查看JSON Lines text format, also called newline-delimited JSON.
要想解析多行JSON文件,需要设置multiLine选项为true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset<String> storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

完整的示例,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Hive Tables

Spark SQL还支持对Apache Hive读写数据。然而,因为Hive有很多的依赖,而这些依赖默认没有包含在Spark的发布中。如果Hive的依赖能够在classpath中找到,Spark将自动加载它们。注意,这些Hive依赖也必须在所有worker节点上存在,因为它们需要访问Hive的序列化和反序列化库(SerDes)以便访问Hive上存储的数据。
Hive的配置是通过替换conf/目录下的hive-site.xml、core-site.xml(安全配置)和hdfs-sit.xml(HDFS配置)来完成的。
当使用Hive工作时,必须实例化支持Hive的SparkSession,包括连接到已有的Hive metastore、支持Hive serdes以及Hive自定义函数。即使没有Hive环境也能够启用Hive支持。当没有通过hive-site.xml进行配置时,context自动在当前目录创建metastore_db,并创建一个由spark.sql.warehouse.dir配置指定的目录,默认目录在Spark application启动的当前目录中的spark-warehouse。注意hive-site.xml中的hive.metastore.warehouse.dir属性在Spark2.0.0中废弃了,取而代之是使用spark.sql.warehouse.dir来指定数据库在仓库中的位置。你可以需要为启动Spark appliction的用户开放写权限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...

完整示例,请查看:examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java。

Specifying storage format for Hive table

当你创建一个Hive表时,你需要指定这个表应该如何从文件系统读写数据,例如”input format”和”output format”。你还需要定义这个表应该如何将data反序列化为row,或者如何将row序列化为data,如”serde”。下面的选项可以被用来指定存储格式(”serde”、”input format”、”output format”),如:CREATE TABLE src(id int) USING hive OPTIONS(fileFormat ‘parquet’)。默认情况下,我们将以简单文本的格式读取table。值得注意的是,在创建table的时候,存储handler还不被支持,你可以在Hive端使用存储handler来创建一个table,然后使用Spark SQL来读区它。

Property Name Meaning
fileFormat 用来说明文件格式的存储格式包,包括”serde”、”input format”和”output format”。当前我们支持6中文件格式:sequencefile、rcfile、orc、parquet、textfile和avro。
inputFormat\outputFormat 这两个选项用来指定”InputFormat“和”OutputFormat“类的名字,例如:org.apache.hadoop.hive.qllio.orc.OrcInputFormat。这两个选项应该成对出现,如果你设置了”fileFormat”选项,那么你不能分别指定它们。
serde 这个选项指定了一个serde类。当设置了‘fileFormat’选项时,如果给定的‘fileFormat’已经包含了serde信息,那么不要设置这个选项。目前,“sequencefile”、“textfile”和“rcfile”不包含serde信息,因此你可以为这3种文件格式设置此选项。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这个选项只能被用于”textfile”的文件格式。它们定义了文件的换行符。

其他属性使用OPTIONS进行定义,将作为Hive serde属性来考虑。

Interacting with Different Versions of Hive Metastore

Spark SQL的Hive支持的最重要部分是与Hive metastore的交互,它使Spark SQL能够访问Hive表中的metadata。从Spark 1.4.0开始,使用下面描述的配置,Spark有一个独立的包用来访问不同版本的Hive metadata。注意,无论要去访问的metastore的Hive是什么版本,在Spark SQL内部将针对Hive 1.2.1进行编译,并使用这些类作为内部执行(serdes、UDFs、UDAFs等)。
下面的选项能够被用来配置获取metadata的Hive的版本:

Property Name Default Meaning
spark.sql.hive.metastore.version 1.2.1 Hive metadata的版本。可用的选项从0.12.0到1.2.1
spark.sql.hive.metastore.jars builtin 被用于实例化HiveMetastoreClient的jar的位置。这个属性有三个选项:
1)builtin: 使用Hive 1.2.1,当-Phive被启用时,它与Spark assembly绑定。当这个选择了这个选项,spark.sql.hive.metastore.version必须是1.2.1或为定义。
2) maven:从Maven库中下载指定版本的Hive jars。这个配置对于生产环境通常不推荐。
3)JVM的标准classpath格式。这个classpath必须包含了Hive和它的依赖,以及对应的版本的Hadoop。这些jar只需要存在于driver上,但是如果你实在yarn资源管理器的集群上,那么你必须确保它们和你的application一起被打包。
spark.sql.hive.metastore. sharedPrefixes com.mysql.jdbc,org.postgresql, com.microsoft.sqlserver,oracla.jdbc 那些需要使用类加载器加载的用于在Spark SQL和指定版本的Hive之间共享的类前缀,类前缀是一个逗号分隔的列表。一个需要被共享的类就是JDBC driver,它需要访问metastore。其他需要共享的类是那些需要与已经共享类交互的类。例如,由log4j使用的自定义appender。
spark.sql.hive.metastore. barrierPrefixes (empty) Spark SQL所连接的每个版本的Hive都应明确加载的类的前缀,列表以逗号分隔。例如,通常需要被共享的Hive UDFs在一个前缀中被声明(如,org.apache.spark.*)

JDBC To Other Databases

Spark SQL还有一个数据源,可以使用JDBC从其他数据库读取数据。这个功能比使用jdbcRDD更加受欢迎。这是因为结果是作为一个DataFrame被返回,这样很容易的使用Spark SQL进行处理或与其他数据源相连接。JDBC数据源在Java或Python中使用起来也很容易,因为它不需要用户提供一个ClassTag。(注意,这不同于Spark SQL JDBC Server,Spark SQL JDBC Server允许其他application使用Spark SQL运行查询)
你需要在spark classpath中添加对应数据库的JDBC driver。例如,要从Spark shell连接到postgres,你应该运行如下命令:

1
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

使用Data Source API,远程数据库中的表可以被加载为一个DataFrame或Spark SQL临时视图。用户可以在数据源选项中指定JDBC的连接属性。连接通畅需要提供user和password属性,来登陆数据源。除了连接属性外,Spark还支持如下的选项,这些选项忽略大小写:

Property Name Meaning
url 进行连接的JDBC URL。特定数据源的连接属性可能会在URL中设置。如:jdbc:postgresql://localhost/test?user=fred&password=secret。
dbtable 要读取的JDBC表。注意,在SQL查询中的From子句中有效的任何东西,都能使用。例如,你可以在括号中使用子查询来代替全表。
driver 连接URL的JDBC driver的类名。
partitionColumn, lowerBound, upperBound 这些选项中的一个被指定,那么所有的都必须被指定。此外,numPartitions必须被指定。它们描述了多个worker并行读取表数据时,应该如何分区。partitionColumn必须是表中的数值列。注意,lowerBound和upperBound仅仅用来决定分区的幅度,而不是过滤表中的行。因此表中的所有行都将被分区并返回。这个选项只能被用于读取。
numPartitions 并行读写表的最大分区数。这也确定了JDBC连接的最大并发。如果写的分区数量超过了这个限制,我们可以在写数据之前调用coalesce(numPartitions)来减少它。
fetchsize JDBC的提取大小,它确定了每次通信能够取得多少行。它能够帮助提升那些默认fetch size低的JDBC dirver的性能(比如,Orache的fetch size为10)。这个选项只能用于读操作。
batch JDBC的batch大小,它确定了每次通信能够插入多少行。这能够帮助提升JDBC dirver的性能。这个选项只能用于写操作。默认值为1000。
isolationLevel 事务的隔离级别,应用于当前连接。它可以是:NONE\READ_COMMITTED\ READ_UNCOMMITTED\REPEATABLE_READ\SERIALIZABLE中的一个,通过JDBC连接对象来定义标准事务的隔离级别,默认为READ_UNCOMMITTED。这个选项只能用于写操作。请参考java.sql.Connection文档。
sessionInitStatement session初始化声明。在到远程数据库的session被打开之后,开始读取数据之前,这个选项执行一个自定义语句(PL/SQL块)。使用这个来实现session的初始化代码。例如:option(“色上司哦那I逆天S塔特闷它”, “”“BEGIN execute immediate ‘alter session set “_serial_direct_read”=true’; END; “””)
truncate 这是一个与JDBC writer相关操作。当启用了SaveMode.Overwrite,这个选项控制删除已存在的表,而不是先drop表然后再创建表。这个更加有效率,并且避免了表的metadata被删除。然而在某些情况下,它无法工作,如新数据有不同的schema。该选项默认值为false。这个选项只用于写操作。
createTableOptions 这是一个与JDBC writer相关的操作。如果设置,该选项允许在创建表的时候设置特定数据库表和分区的选项(如,CREATE TABLE T(name string) ENGINE=InnoDB)。这个选项只能被用于写操作。
createTableColumnTypes 当创建表时,用来代替默认的数据库列类型。数据类型信息使用与CREATE TABLE columns语句(如:”name CHAR(64), comments VARCHAR(1024)”)相同的格式被指定。被指定的数据类型应该是有效的spark sql数据类型。本选项只能用于写操作。
customSchema 自定义schema用于从JDBC连接中读取数据。例如,”id DECIMAL(38, 0), name STRING”。你也可以指定部分字段,其他的时候默认类型映射。例如:”id DECIMAL(38, 0)”。列名称应该与JDBC表的相关列名称一致。用户可以指定Spark SQL的相关数据类型,而不是使用默认的。这个选项只能被用于读操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Saving data to a JDBC source
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Specifying create table column data types on write
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

完整示例代码,请查看:examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java。

Troubleshooting

1、JDBC dirver类对于client session和所有executor的主类加载器是可访问的。这是因为Java的DriverManager会做一个安全检查,当DriverManager要打开一个连接时,检查结果会忽略所有主类加载器无法访问的driver。一个简便的方法是修改所有worker节点的compute_classpath.sh来包含你的driver JAR。
2、一些数据库,如H2,需要将所有名字转换为大写。你需要在Spark SQL中使用大写来引用那些名字。

Performance Tuning

通过将数据缓存到内存或开启一些创新选项,一些工作量是可以优化提升性能的。

Caching Data In Memory

通过调用spark.catalog.cacheTable(“tableName”)或dataFrame.cache(),Spark能够使用内存中列式格式来缓存表。Spark SQL将扫描需要的列,并自动调整压缩,以达到最小的内存使用和GC压力。你可以使用spark.catalog.uncacheTable(“tableName”),将table从内存中移除。
配置内存缓存可以通过两种方式来实现:在SparkSession上调用setConf方法,或者使用SQL来执行SET key=value命令。

Property Name Default Meaning
spark.sql. inMemoryColumnarStorage.compressed true 当设置为true的时候,Spark SQL将基于数据的统计自动为每一列选择一种压缩编码器。
spark.sql. imMemoryColumnarStorage.batchSize 10000 控制列式缓存的批量大小。较大的批量size会影响内存会提高内存的利用率和压缩,但是会产生内存溢出的风险。

Other Configuration Options

下面的选项也能够被用来提高查询的效率。随着Spark的优化,这些选项在未来可能会被废弃。

Property Name Default Meaning
spark.sql.files.maxPartitionbytes 134217728 (128 MB) 读取文件时,单个分区的最大字节数。
saprk.sql.files.openCostInBytes 4194304 (4 MB) 打开一个文件的成本,通过在同一时间能够扫描的字节数来测量。当推送多个文件到一个partition时非常有用。提高这个值会更好,这样写小文件的partition要比写大文件的partition更加快(写小文件的partitin优先调度)。
spark.sql.broadcastTimeout 300 broadcast连接的等待时间,以秒为单位。
spark.sql.broadcastJoinThreshold 10485760 (10 MB) 当执行join操作时,为那些需要广播到所有worker节点的表设置最大字节数。通过设置这个值为-1,广播操作可以被禁用。注意,当前的统计只支持那些运行了ANALYZE TABLE COMPUTE STATISTICS命令的Hive Metastore表。
spark.sql.shuffle.partitions 200 当为join或aggregation操作而混洗数据时,用来配置使用partitions的数量。

Broadcast Hint for SQL Queries

BROADCAST hint指导Spark在使用其他表或视图join指定表时,如何广播指定表。在Spark决定join方法时,broadcast hash join被优先考虑,即使统计高于spark.sql.autoBroadcastJoinThreshold的配置。当join两边都被指定了,Spark广播具有较低统计的那边。注意Spark不保证BHJ(broadcast hash join)总是被选择,因为不是所有的情况都支持BHJ。当broadcast nested loop join被选择时,我们仍然最重提示。

1
2
import static org.apache.spark.sql.functions.broadcast;
broadcast(spark.table("src")).join(spark.table("records"), "key").show();

Distributed SQL Engine

使用Spark SQL的JDBC/ODBC或command-line interface,Spark SQL也能够具有分布式查询引擎的行为。在这种模式中,终端用户或application能够直接与Spark SQL交互来运行SQL查询,而不需要写任何的代码。

Running the Thrift JDBC/ODBC server

Thrift JDBC/ODBC server实现了相当于Hive 1.2.1中的HiveServer2。你可以使用Spark或Hive1.2.1的beeline脚本来测试JDBC server。
要启动JDBC/ODBC server,在Spark目录中运行如下:

1
./sbin/start-thriftserver.sh

这个脚本接受所有bin/spark-submit命令的行的参数,并增加了一个–hiveconf选项用来指定Hive属性。你可以执行 ./sbin/start-thriftserver.sh –help来获取完整的可用属性列表。默认,这个server监听的是本地的10000端口。要想重写这个丢昂扣,你可以修改环境变量:

1
2
3
4
5
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \
...

或者修改系统属性:

1
2
3
4
5
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
...

现在,你可以使用beeline来测试Thrift JDBC/ODBC server:

1
./bin/beeline

在beeline中连接JDBC/ODBC server可以使用:

1
beeline> !connect jdbc:hive2://localhost:10000

beeline将会询问你用户名和密码。在非安全模式中,输入你机器的用户名和空白的密码。对于安全模式,请遵循beeline documentation的指导。

通过替换conf/中hive-site.xml、core-site.mxl和hdfs-site.xml来完成Hive的配置。

你可能还需要使用Hive提供的beeline脚本。

Thrift JDBC server还支持通过HTTP协议发送thrift RPC messages。要启用HTTP模式,可以如下修改系统属性,或者修改conf中的hive-site.xml:

1
2
3
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要进行测试,使用beeline以http模式连接到JDBC/ODBC server:

1
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

Running the Spark SQL CLI

Spark SQL CLI是一个方便的工具用来在本地模式中运行Hive metastore服务并执行来自命令的查询输入。注意,Spark SQL CLI不能与Thrift JDBC server通信,
要启动Spark SQL CLI,在Spark目录中运行如下脚本:

1
./bin/spark-sql

通过替换conf/中hive-site.xml、core-site.mxl和hdfs-site.xml来完成Hive的配置。

Reference

Data Types

Spark SQL和DataFrame支持如下数据类型:

1、Numeric types
ByteType:声明一个一个字节的有符号的整型。数值范围从-128到127。
ShortType:声明一个两字节的有符号的整型。数值范围从-32768到32767。
IntegerType:声明一个四字节的有符号的整型。数值范围从-2147483648到2147483647。
LongType:声明一个八个字节的有符号的整型。数值范围从-9223372036854775808到9223372036854775807。
FloatType:声明一个四字节的单精度浮点数值。
DoubleType:声明一个八字节的双精度浮点数。
DecimlType:声明一个任意精度的有符号的十进制数值。内部由java.math.BigDecimal支持。一个DecimlType由一个任意精度的不能整型值和一个32位的整型组成。
2、Strubg type
声明一个字符串值。
3、Binary type
BinaryType:声明一个字节序列值。
4、Boolean type
BooleanType:声明一个boolean值。
5、Datetime type
TimestampType:声明一个由year、month、day、hour、minute和second字段的值组成。
DateType:声明一个由year、month和day字段的值组成。
6、Complex types
ArrayType(elementType, containsNull):声明一个elementType类型序列。containsNull用来检测ArrayType中是否包含null的值。
MapType(keyType, valueType, valueContainsNull):由一组key-value对组成。key的数据类型由KeyType来描述,value的数据类型由valueType来描述。对于MapType的一个值,keys不允许为null。valueContainsNull
被用来检测MapTypte的values中是否包含null值。
StructType(fields):StructFields(fields)序列。
StructField(name, datatype, nullable): StructType类型的字段。字段的名称通过name指定。字段的数据类型通过datatype来指定。nullable用来决定这个fields的values是否可以有null。

Spark SQL的所有数据类型都位于org.apache.spark.sql.types包中。要访问或创建一种数据类型,请使用org.apache.spark.sql.types.DataTypes中提供的接口方法。

Data type Value type in Java API to access or create a data type
ByteType byte or Byte DataTypes.ByteType
ShortType short or Short DataTypes.ShortType
IntegerType int or Integer DataTypes.IntegerType
LongType long or Long DataTypes.LongType
FloatType float or Float DataTypes.FloatType
DoubleType double or Double DataTypes.DoubleType
DecimalType java.math.BigDecimal DataTypes.createDecimalType() DataTypes.createDecimalType(precision, scale)
StringType String DataTypes.StringType
BinaryType byte[] DataTypes.BinaryType
BooleanType boolean or Boolean DataTypes.BooleanType
TimestampType java.sql.Timestamp DataTypes.TimestampType
DateType java.sql.Date DateTypes.DateType
ArrayType java.util.List DataTypes.createArrayType(elementType) 注意:containsNull的值为true。
MapType java.util.Map DataTypes.createMapType(keyType, valueType) 注意,valueContainsNull的值将为true
StructType org.apache.spark.sql.Row DataTypes.createStructType(fields)
StructField The value type in Java of the data type of this field DataTypes.createStructField(name, dataType, nullable)