Spark SQL,DataFrames and DataSets Guide官方文件翻譯
阿新 • • 發佈:2019-02-01
通過使用saveAsTable命令,可以將DataFrames持久化到表中。已有的Hive部署不需要使用這個特性,Spark將會建立一個預設的本地Hive 元資料庫。和createOrRepalceTempView不同的是,saveAsTable會實體化DataFrame的內容,並且會在Hive 元資料庫中建立一個指標指向該資料。只要維護與同一個元資料庫的連線,即使你重啟Spark程式,那個持久化的表仍然會存在。通過使用SparkSession的table方法,並以表名為引數,就可以建立一個和DataFrame一樣的持久化表。
預設情況下,saveAsTable方法將建立一個“managed table”,意味著資料的位置受元資料庫控制。當表被刪除後,managed table也會刪除儲存的資料。
Parquet檔案
Parquet是多種資料處理系統支援的列式資料格式。該檔案保留了原始資料的模式,Spark SQL提供了parquet檔案的讀寫操作。
讀取Parquet檔案
例子:
在類似於Hive的系統中,對錶進行分割槽是對資料進行優化的方式之一。在一個分割槽的表中,資料通過分割槽列將資料儲存在不同的目錄下。Parquet資料來源現在能夠自動發現並解析分割槽資訊。例如,對人口資料進行分割槽儲存,分割槽列為gender和country,使用下面的目錄結構:
。如果想關閉該功能,直接將該引數設定為disabled。此時,分割槽列資料格式將被預設設定為string型別,不再進行型別解析。
從Spark1.6開始,分割槽解析只有在預設給定的路徑下才會發現分割槽。對於上面的例子,如果使用者將path/to/table/gender=male傳給SparkSession.read.parquet或SparkSession.read.load,gender不會被當做分割槽列。如果使用者需要指定分割槽解析開始的基本路徑,可以在資料來源options中設定basePath。例如,當path/to/table/gender=male是資料的路徑時,使用者設定basePath的值為path/to/table/,那麼gender就會成為分割槽列。
Schema合併
像ProtocolBuffer、Avro和Thrift,Parquet也支援schema evolution(schema演變)。使用者可以先定義一個簡單的Schema,然後逐漸的向Schema中增加列描述。通過這種方式,使用者可以獲取多個有不同Schema但相互相容的Parquet檔案。現在Parquet資料來源能自動檢測這種情況,併合並這些檔案的schemas。
因為schema合併是一個高消耗的操作,很多情況下是不需要的,所以從1.5開始預設關閉了這個功能。可以通過以下兩種方式開啟:
1、當資料來源是Parquet檔案時,將資料來源選項mergeSchema設定為true(如下面的例子)。
2、將全域性SQL選項spark.sql.parquet.mergeSchema設定為true。
例子:
分割槽解析schemaPeople.write.parquet("people.parquet")#schemaPeople是上邊例子建立的DataFrame,parquet方法將DataFrame內容以Parquet的格式進行儲存,維持著schema資訊。 # Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. parquetFile = spark.read.parquet("people.parquet") #Parquet檔案也可以用來建立一個臨時的view,然後用於SQL語句中 parquetFile.createOrReplaceTempView("parquetFile"); teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") for teenName in teenNames.collect(): print(teenName)
在類似於Hive的系統中,對錶進行分割槽是對資料進行優化的方式之一。在一個分割槽的表中,資料通過分割槽列將資料儲存在不同的目錄下。Parquet資料來源現在能夠自動發現並解析分割槽資訊。例如,對人口資料進行分割槽儲存,分割槽列為gender和country,使用下面的目錄結構:
通過將path/to/table傳遞給SparkSession.read.parquet或SparkSession.read.load,Spark SQL可以根據路徑自動解析分割槽資訊。返回的DataFrame的schema如下:path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
需要注意的是,分割槽列的資料型別是自動解析的。當前支援數值型別和string型別。有時使用者不希望自動解析分割槽列的資料型別,自動解析分割槽型別的引數為:park.sql.sources.partitionColumnTypeInference.enabled,預設值為true# spark from the previous example is used in this example.
# Create a simple DataFrame, stored into a partition directory
df1 = spark.createDataFrame(sc.parallelize(range(1, 6))\
.map(lambda i: Row(single=i, double=i * 2)))
df1.write.parquet("data/test_table/key=1")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
df2 = spark.createDataFrame(sc.parallelize(range(6, 11))
.map(lambda i: Row(single=i, triple=i * 3)))
df2.write.parquet("data/test_table/key=2")
# Read the partitioned table
df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- single: int (nullable = true)
# |-- double: int (nullable = true)
# |-- triple: int (nullable = true)
# |-- key : int (nullable = true)
Hive metastore Parquet錶轉換
當向Hive metastore讀寫Parquet表時,為了更好地效能,Spark SQL將使用自帶的Parquet SerDe,而不用Hive的SerDe(SerDe:Serialize/Deserialize的簡稱,目的是用於序列化和反序列化)。這個優化的配置引數為spark.sql.hive.convertMetastoreParquet,預設是開啟的。
Hive/Parquet Schema反射(Reconciliation)
從表schema處理的角度來看,Hive和Parquet有兩個主要區別:
1、Hive不區分大小寫(is case insensitive),而Parquet區分大小寫。
2、Hive允許所有列為空,而Parquet中的空是有重要意義的。
由於這兩個區別,當我們將Hive metastore Parquet錶轉換成Spark SQL Parquet表時,需要將Hive metastore schema 和Parquet schema一致化,一致化規則如下:
1、兩個schema中同名的欄位必須具有相同的資料型別。一致化後的欄位必須為Parquet的欄位型別,所以空值是很重要的(nullability is respected)。
2、一致化後的schema只包含那些在Hive metastore schema中定義的欄位。
(1)在一致化後的schema中忽略只出現在Parquet schema的欄位。
(2)將只出現在Hive metastore schema的欄位設為nullable欄位,並加到一致化後的schema中。
元資料重新整理
為了更好的效能,Spark SQL 快取了Parquet 元資料。當Hive metastore Parquet錶轉換是enabled時,那些轉換後的表的元資料也能夠被快取。當表被Hive或其他工具更新後,為了保證元資料的一致性,需要手動重新整理元資料。示例:
# spark is an existing HiveContext
spark.refreshTable("my_table")
配置
可以使用SparkSession的setConf方法來配置Parquet,或者使用SQL執行SET key==value命令。