spark SQL(三)資料來源 Data Source----通用的資料 載入/儲存功能
Spark SQL 的資料來源------通用的資料 載入/儲存功能
Spark SQL支援通過DataFrame介面在各種資料來源上進行操作。DataFrame可以使用關係變換進行操作,也可以用來建立臨時檢視。將DataFrame 註冊為臨時檢視允許您對其資料執行SQL查詢。本節介紹使用Spark Data Sources載入和儲存資料的一般方法,然後介紹可用於內建資料來源的特定選 項。
1, 常用的載入和儲存功能。
最簡單的形式,預設的資料來源(parquet除非另有配置 spark.sql.sources.default)將用於所有的操作。
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
2,手動指定選項
您也可以手動指定將要使用的資料來源以及您想要傳遞給資料來源的其他選項。資料來源通過其全名指定(即org.apache.spark.sql.parquet),但內建的來源,你也可以使用自己的短名稱(json,parquet,jdbc,orc,libsvm,csv,text)。從任何資料來源型別載入的資料框可以使用此語法轉換為其他型別。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
3, 直接在檔案上執行SQL
您可以使用SQL直接查詢該檔案,而不是使用讀取API將檔案載入到DataFrame中並進行查詢。
其中people.csv的資料為:val peopleDFCsv = spark.read.format("csv") .option("sep", ";") .option("inferSchema", "true") .option("header", "true") .load("examples/src/main/resources/people.csv") val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
name;age;job
Jorge;30;Developer
Bob;32;Developer
4,儲存模式
儲存操作可以選擇一個Save Mode,指定如何處理現有的資料(如果存在)。認識到這些儲存模式不使用任何鎖定資料而不是原子性的操作資料是很重要的。另外,執行時重寫資料,資料在寫出新資料之前將被刪除。常見型別如下:
Scala/Java | Any Language | Meaning |
SaveMode.ErrorIfExists (default) | "error" (default) | 如果資料已經存在,將DataFrame儲存到資料來源時,則預計會丟擲異常。 |
SaveMode.Append | "append" | 如果data / table已經存在,將DataFrame儲存到資料來源時,則DataFrame的內容將被新增到現有資料中。 |
SaveMode.Overwrite | "overwrite" | 覆蓋模式意味著將DataFrame儲存到資料來源時,如果data / table已經存在,則現有資料將被DataFrame的內容覆蓋。 |
SaveMode.Ignore | "ignore" | 忽略模式意味著,當將DataFrame儲存到資料來源時,如果資料已經存在,儲存操作將不會儲存DataFrame的內容,也不會更改現有資料。這與CREATE TABLE IF NOT EXISTSSQL中的類似。 |
DataFrames也可以使用該saveAsTable 命令將其作為持久表儲存到Hive Metastore中。請注意,現有的Hive部署對於使用此功能不是必需的。Spark將為您建立一個預設的本地Hive Metastore(使用Derby)。與createOrReplaceTempView命令不同的是,
saveAsTable將實現DataFrame的內容並建立指向Hive Metastore中的資料的指標。即使您的Spark程式重新啟動後,永久性表格仍然存在,只要您保持與同一Metastore的連線即可。用於持久表的DataFrame可以通過使用表的名稱呼叫tablea方法來建立SparkSession。
對於基於檔案的資料來源,例如文字,parquet,json等,您可以通過path選項指定一個自定義表格路徑 ,例如df.write.option("path", "/some/path").saveAsTable("t")。當表被刪除時,自定義表路徑將不會被刪除,表資料仍然存在。如果沒有指定自定義表格路徑,Spark會將資料寫入倉庫目錄下的預設表格路徑。當表被刪除時,預設的表路徑也將被刪除。
從Spark 2.1開始,持久資料來源表具有儲存在Hive Metastore中的每個分割槽元資料。這帶來了幾個好處:
1) 由於Metastore只能返回查詢所需的分割槽,因此不再需要發現第一個查詢的所有分割槽。
2) Hive DDL如ALTER TABLE PARTITION ... SET LOCATION現在可用於使用Datasource API建立的表。
請注意,建立外部資料來源表(具有path選項的那些表)時,預設情況下不會收集分割槽資訊。要同步Metastore中的分割槽資訊,可以呼叫MSCK REPAIR TABLE。
6,Bucketing(分段), Sorting(排序) and Partitioning(分割槽)
對於基於檔案的資料來源,也可以對輸出進行分類。分段和排序僅適用於持久表:
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
而分割槽則可以同時使用save和saveAsTable使用資料集API。 usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
同時也可以對單個表使用分割槽和分割槽:
peopleDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed")
partitionBy
建立一個目錄結構,如“ Partition
Discovery”部分所述。因此,對基數高的柱子的適用性有限。相比之下 bucketBy
,通過固定數量的桶分配資料,並且可以在大量唯一值無界時使用。上述完整的例子程式碼如下:
private def runBasicDataSourceExample(spark: SparkSession): Unit = {
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
peopleDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed")
spark.sql("DROP TABLE IF EXISTS people_bucketed")
spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")
}
其中people.json測試資料如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}