Spark筆記之Catalog
一、什麽是Catalog
Spark SQL提供了執行sql語句的支持,sql語句是以表的方式組織使用數據的,而表本身是如何組織存儲的呢,肯定是存在一些元數據之類的東西了,Catalog就是Spark 2.0之後提供的訪問元數據的類:
Catalog提供一些API用來對數據庫、表、視圖、緩存、列、函數(UDF/UDAF)進行操作,下文將一一介紹。
二、如何使用Catalog
得到Catalog:
val spark = SparkSession.builder().master("local[*]").appName("catalog-study").getOrCreate() val catalog = spark.catalog
Catalog相關的代碼存放在org.apache.spark.sql.catalog下:
上面的Catalog只是一個接口定義規範,具體實現還有一個org.apache.spark.sql.internal.CatalogImpl,如果只是使用Spark完成工作的話只閱讀接口定義基本夠用了。
三、相關API
數據庫相關
看數據庫相關的操作之前先看一下Catalog對數據庫的表示:
/** * A database in Spark, as returned by the `listDatabases` method defined in [[Catalog]]. * * @param name name of the database. * @param description description of the database. * @param locationUri path (in the form of a uri) to data files. * @since 2.0.0 */ @InterfaceStability.Stable class Database( val name: String, @Nullable val description: String, val locationUri: String) extends DefinedByConstructorParams { override def toString: String = { "Database[" + s"name=‘$name‘, " + Option(description).map { d => s"description=‘$d‘, " }.getOrElse("") + s"path=‘$locationUri‘]" } }
Catalog使用三個字段表示一個數據庫:
name:數據庫名字
descripttion:數據庫描述,可以認為是註釋
locationUri:數據庫的數據保存位置
currentDatabase: String
返回當前使用的數據庫,相當於select database();
setCurrentDatabase(dbName: String): Unit設置當前使用的數據庫,相當於use database_name;
listDatabases(): Dataset[Database]
查看所有數據庫,相當於show databases;
getDatabase(dbName: String): Database
獲取某數據庫的元數據,返回值是Database類型的,如果指定的數據庫不存在則會@throws[AnalysisException]("database does not exist")
databaseExists(dbName: String): Boolean
判斷某個數據庫是否已經存在,返回boolean值。
為了避免拋異常對單個數據庫進行getDatabase獲取元數據之前還是先使用databaseExists確定數據庫已經存在。
表/視圖相關
同樣的,對表或視圖Catalog也用一個class來表示:
/** * A table in Spark, as returned by the `listTables` method in [[Catalog]]. * * @param name name of the table. * @param database name of the database the table belongs to. * @param description description of the table. * @param tableType type of the table (e.g. view, table). * @param isTemporary whether the table is a temporary table. * @since 2.0.0 */ @InterfaceStability.Stable class Table( val name: String, @Nullable val database: String, @Nullable val description: String, val tableType: String, val isTemporary: Boolean) extends DefinedByConstructorParams { override def toString: String = { "Table[" + s"name=‘$name‘, " + Option(database).map { d => s"database=‘$d‘, " }.getOrElse("") + Option(description).map { d => s"description=‘$d‘, " }.getOrElse("") + s"tableType=‘$tableType‘, " + s"isTemporary=‘$isTemporary‘]" } }
name:表的名字
database:表所屬的數據庫的名字
description:表的描述信息
tableType:用於區分是表還是視圖,兩個取值:table或view。
isTemporary:是否是臨時表或臨時視圖,解釋一下啥是臨時表,臨時表就是使用Dataset或DataFrame的createOrReplaceTempView等類似的API註冊的視圖或表,當此次Spark任務結束後這些表就沒了,再次使用的話還要再進行註冊,而非臨時表就是在Hive中真實存在的,開啟Hive支持就能夠直接使用的,本次Spark任務結束後表仍然能存在,下次啟動不需要重新做任何處理就能夠使用,表是持久的,這種不是臨時表。
listTables(): Dataset[Table]
查看所有表或視圖,相當於show tables;
listTables(dbName: String): Dataset[Table]
返回指定數據庫下的表或視圖,如果指定的數據庫不存在則會拋出@throws[AnalysisException]("database does not exist")表示數據庫不存在。
getTable(tableName: String): Table getTable(dbName: String, tableName: String): Table
獲取表的元信息,不存在則會拋出異常。
tableExists(tableName: String): Boolean tableExists(dbName: String, tableName: String): Boolean
判斷表或視圖是否存在,返回boolean值。
dropTempView(viewName: String): Boolean dropGlobalTempView(viewName: String): Boolean
使用createOrReplaceTempView類似API註冊的臨時視圖可以使用此方法刪除,如果這個視圖已經被緩存過的話會自動清除緩存。
recoverPartitions(tableName: String): Unit
isCached(tableName: String): Boolean
用於判斷一個表否已經緩存過了。
cacheTable(tableName: String): Unit cacheTable(tableName: String, storageLevel: StorageLevel): Unit用於緩存表
uncacheTable(tableName: String): Unit
對表取消緩存
clearCache(): Unit
清空所有緩存
refreshTable(tableName: String): Unit
Spark為了性能考慮,對表的元數據做了緩存,所以當被緩存的表已經改變時也必須刷新元數據重新緩存。
refreshByPath(path: String): Unit
createTable(tableName: String, path: String): DataFrame createTable(tableName: String, path: String, source: String): DataFrame createTable(tableName: String, source: String, options: java.util.Map[String, String]): DataFrame createTable(tableName: String, source: String, options: Map[String, String]): DataFrame createTable(tableName: String, source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
函數相關
Catalog對函數的表示:
/** * A user-defined function in Spark, as returned by `listFunctions` method in [[Catalog]]. * * @param name name of the function. * @param database name of the database the function belongs to. * @param description description of the function; description can be null. * @param className the fully qualified class name of the function. * @param isTemporary whether the function is a temporary function or not. * @since 2.0.0 */ @InterfaceStability.Stable class Function( val name: String, @Nullable val database: String, @Nullable val description: String, val className: String, val isTemporary: Boolean) extends DefinedByConstructorParams { override def toString: String = { "Function[" + s"name=‘$name‘, " + Option(database).map { d => s"database=‘$d‘, " }.getOrElse("") + Option(description).map { d => s"description=‘$d‘, " }.getOrElse("") + s"className=‘$className‘, " + s"isTemporary=‘$isTemporary‘]" } }
name:函數的名字
database:函數註冊在哪個數據庫下,函數是跟數據庫綁定的
description:對函數的描述信息,可以理解成註釋
className:函數其實就是一個class,調用函數就是調用類的方法,className表示函數對應的class的全路徑類名
isTemporary:是否是臨時函數。
listFunctions(): Dataset[Function]
列出當前數據庫下的所有函數,包括註冊的臨時函數。
listFunctions(dbName: String): Dataset[Function]
列出指定數據庫下註冊的所有函數,包括臨時函數,如果指定的數據庫不存在的話則會拋出@throws[AnalysisException]("database does not exist")表示數據庫不存在。
getFunction(functionName: String): Function getFunction(dbName: String, functionName: String): Function
獲取函數的元信息,函數不存在則會拋出異常。
functionExists(functionName: String): Boolean functionExists(dbName: String, functionName: String): Boolean判斷函數是否存在,返回boolean值。
對表或視圖的列相關的操作
Catalog對列的表示:
/** * A column in Spark, as returned by `listColumns` method in [[Catalog]]. * * @param name name of the column. * @param description description of the column. * @param dataType data type of the column. * @param nullable whether the column is nullable. * @param isPartition whether the column is a partition column. * @param isBucket whether the column is a bucket column. * @since 2.0.0 */ @InterfaceStability.Stable class Column( val name: String, @Nullable val description: String, val dataType: String, val nullable: Boolean, val isPartition: Boolean, val isBucket: Boolean) extends DefinedByConstructorParams { override def toString: String = { "Column[" + s"name=‘$name‘, " + Option(description).map { d => s"description=‘$d‘, " }.getOrElse("") + s"dataType=‘$dataType‘, " + s"nullable=‘$nullable‘, " + s"isPartition=‘$isPartition‘, " + s"isBucket=‘$isBucket‘]" } }
name:列的名字
description:列的描述信息,與註釋差不多
dataType:列的數據類型
nullable:列是否允許為null
isPartition:是否是分區列
isBucket:是否是桶列
listColumns(tableName: String): Dataset[Column] listColumns(dbName: String, tableName: String): Dataset[Column]
列出指定的表或視圖有哪些列,表不存在則拋異常。
相關資料:
1. Spark 2.0介紹:Catalog API介紹和使用
2. Java Doc: Class Catalog
.
Spark筆記之Catalog