Spark Catalog

Spark Catalog
Spark SQL提供了执行sql语句功能,sql语句是以表的方式组织、使用数据,那么表本身是如何组织存储的呢?肯定会有元数据之类的东西存在,Catalog就是Spark2.0之后提供的访问元数据的类。

1
2

Catalog提供了一些API,用来对数据库、表、视图、缓存、列、函数进行操作。

Catalog相关的代码位于spark-sql包的org.apache.spark.sql.catalog目录下,它定义的对象有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Database(
val name: String,
@Nullable val description: String,
val locationUri: String) extends DefinedByConstructorParams
class Table(
val name: String,
@Nullable val database: String,
@Nullable val description: String,
val tableType: String,
val isTemporary: Boolean) extends DefinedByConstructorParams
class Column(
val name: String,
@Nullable val description: String,
val dataType: String,
val nullable: Boolean,
val isPartition: Boolean,
val isBucket: Boolean) extends DefinedByConstructorParams
class Function(
val name: String,
@Nullable val database: String,
@Nullable val description: String,
val className: String,
val isTemporary: Boolean) extends DefinedByConstructorParams

而对于Catalog本身,它定义了接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def currentDatabase: String
def setCurrentDatabase(dbName: String): Unit
def listDatabases(): Dataset[Database]
def listTables(): Dataset[Table]
def listTables(dbName: String): Dataset[Table]
def listFunctions(): Dataset[Function]
def listFunctions(dbName: String): Dataset[Function]
def listColumns(tableName: String): Dataset[Column]
def listColumns(dbName: String, tableName: String): Dataset[Column]
def getDatabase(dbName: String): Database
def getTable(tableName: String): Table
def getTable(dbName: String, tableName: String): Table
def getFunction(functionName: String): Function
def getFunction(dbName: String, functionName: String): Function
def databaseExists(dbName: String): Boolean
def tableExists(tableName: String): Boolean
def tableExists(dbName: String, tableName: String): Boolean
def functionExists(functionName: String): Boolean
def functionExists(dbName: String, functionName: String): Boolean
def createTable(tableName: String, path: String, source: String): DataFrame
def createTable(tableName: String, source: String, options: Map[String, String]): DataFrame
def createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
def dropTempView(viewName: String): Boolean
def dropGlobalTempView(viewName: String): Boolean
def recoverPartitions(tableName: String): Unit
def isCached(tableName: String): Boolean
def cacheTable(tableName: String): Unit
def uncacheTable(tableName: String): Unit
def clearCache(): Unit
def refreshTable(tableName: String): Unit
def refreshByPath(path: String): Unit

在Spark中,Catalog只有一个实现类:CatalogImpl,而Catalog的实现依赖的是SessionCatalog(通过sparkSession.sessionState.catalog)。在SessionCatalog中定义了一些功能:

1
2
3
4
5
# 数据库是否存在
def databaseExists(db: String): Boolean = {
val dbName = formatDatabaseName(db)
externalCatalog.databaseExists(dbName)
}

从上面方法的定义可以看出,SessionCatalog又依赖externalCatalog来实现的。SessionCatalog中的externalCatalog是在创建SessionCatalog的时候,通过参数传入的。对于ExternalCatalog,我们之后再讨论,这里只是先关注 CatalogImpl和SessionCatalog。

除了SessionCatalog,在CatalogImp中还会用到 sparkSession.sessinState.sqlParser,用来解析Table、Function等对象的id,如:

1
2
sparkSession.sessionState.sqlParser.parseTableIdentifier
sparkSession.sessionState.sqlParser.parseFunctionIdentifier

creatTable
CatalogImpl中对 createTable的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
override def createTable(
tableName: String,
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val storage = DataSource.buildStorageFormatFromOptions(options)
val tableType = if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = tableType,
storage = storage,
schema = schema,
provider = Some(source)
)
val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
sparkSession.sessionState.executePlan(plan).toRdd
sparkSession.table(tableIdent)
}