1. 程式人生 > >Spark 系列(十)—— Spark SQL 外部資料來源

Spark 系列(十)—— Spark SQL 外部資料來源

一、簡介

1.1 多資料來源支援

Spark 支援以下六個核心資料來源,同時 Spark 社群還提供了多達上百種資料來源的讀取方式,能夠滿足絕大部分使用場景。

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC/ODBC connections
  • Plain-text files

注:以下所有測試檔案均可從本倉庫的resources 目錄進行下載

1.2 讀資料格式

所有讀取 API 遵循以下呼叫格式:

// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()

// 示例
spark.read.format("csv")
.option("mode", "FAILFAST")          // 讀取模式
.option("inferSchema", "true")       // 是否自動推斷 schema
.option("path", "path/to/file(s)")   // 檔案路徑
.schema(someSchema)                  // 使用預定義的 schema      
.load()

讀取模式有以下三種可選項:

讀模式 描述
permissive 當遇到損壞的記錄時,將其所有欄位設定為 null,並將所有損壞的記錄放在名為 _corruption t_record 的字串列中
dropMalformed 刪除格式不正確的行
failFast 遇到格式不正確的資料時立即失敗

1.3 寫資料格式

// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE")         //寫模式
.option("dateFormat", "yyyy-MM-dd")  //日期格式
.option("path", "path/to/file(s)")
.save()

寫資料模式有以下四種可選項:

Scala/Java 描述
SaveMode.ErrorIfExists 如果給定的路徑已經存在檔案,則丟擲異常,這是寫資料預設的模式
SaveMode.Append 資料以追加的方式寫入
SaveMode.Overwrite 資料以覆蓋的方式寫入
SaveMode.Ignore 如果給定的路徑已經存在檔案,則不做任何操作


二、CSV

CSV 是一種常見的文字檔案格式,其中每一行表示一條記錄,記錄中的每個欄位用逗號分隔。

2.1 讀取CSV檔案

自動推斷型別讀取讀取示例:

spark.read.format("csv")
.option("header", "false")        // 檔案中的第一行是否為列的名稱
.option("mode", "FAILFAST")      // 是否快速失敗
.option("inferSchema", "true")   // 是否自動推斷 schema
.load("/usr/file/csv/dept.csv")
.show()

使用預定義型別:

import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//預定義資料格式
val myManualSchema = new StructType(Array(
    StructField("deptno", LongType, nullable = false),
    StructField("dname", StringType,nullable = true),
    StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()

2.2 寫入CSV檔案

df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")

也可以指定具體的分隔符:

df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")

2.3 可選配置

為節省主文篇幅,所有讀寫配置項見文末 9.1 小節。


三、JSON

3.1 讀取JSON檔案

spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)

需要注意的是:預設不支援一條資料記錄跨越多行 (如下),可以通過配置 multiLinetrue 來進行更改,其預設值為 false

// 預設支援單行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}

//預設不支援多行
{
  "DEPTNO": 10,
  "DNAME": "ACCOUNTING",
  "LOC": "NEW YORK"
}

3.2 寫入JSON檔案

df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")

3.3 可選配置

為節省主文篇幅,所有讀寫配置項見文末 9.2 小節。


四、Parquet

Parquet 是一個開源的面向列的資料儲存,它提供了多種儲存優化,允許讀取單獨的列非整個檔案,這不僅節省了儲存空間而且提升了讀取效率,它是 Spark 是預設的檔案格式。

4.1 讀取Parquet檔案

spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)

2.2 寫入Parquet檔案

df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")

2.3 可選配置

Parquet 檔案有著自己的儲存規則,因此其可選配置項比較少,常用的有如下兩個:

讀寫操作 配置項 可選值 預設值 描述
Write compression or codec None,
uncompressed,
bzip2,
deflate, gzip,
lz4, or snappy
None 壓縮檔案格式
Read mergeSchema true, false 取決於配置項 spark.sql.parquet.mergeSchema 當為真時,Parquet 資料來源將所有資料檔案收集的 Schema 合併在一起,否則將從摘要檔案中選擇 Schema,如果沒有可用的摘要檔案,則從隨機資料檔案中選擇 Schema。

更多可選配置可以參閱官方文件:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html


五、ORC

ORC 是一種自描述的、型別感知的列檔案格式,它針對大型資料的讀寫進行了優化,也是大資料中常用的檔案格式。

5.1 讀取ORC檔案

spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)

4.2 寫入ORC檔案

csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")


六、SQL Databases

Spark 同樣支援與傳統的關係型資料庫進行資料讀寫。但是 Spark 程式預設是沒有提供資料庫驅動的,所以在使用前需要將對應的資料庫驅動上傳到安裝目錄下的 jars 目錄中。下面示例使用的是 Mysql 資料庫,使用前需要將對應的 mysql-connector-java-x.x.x.jar 上傳到 jars 目錄下。

6.1 讀取資料

讀取全表資料示例如下,這裡的 help_keyword 是 mysql 內建的字典表,只有 help_keyword_idname 兩個欄位。

spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")            //驅動
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")   //資料庫地址
.option("dbtable", "help_keyword")                    //表名
.option("user", "root").option("password","root").load().show(10)

從查詢結果讀取資料:

val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()

//輸出
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|             10|      ALTER|
|             11|    ANALYSE|
|             12|    ANALYZE|
|             13|        AND|
|             14|    ARCHIVE|
|             15|       AREA|
|             16|         AS|
|             17|   ASBINARY|
|             18|        ASC|
|             19|     ASTEXT|
+---------------+-----------+

也可以使用如下的寫法進行資料的過濾:

val props = new java.util.Properties
props.setProperty("driver", "com.mysql.jdbc.Driver")
props.setProperty("user", "root")
props.setProperty("password", "root")
val predicates = Array("help_keyword_id < 10  OR name = 'WHEN'")   //指定資料過濾條件
spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show() 

//輸出:
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|            604|       WHEN|
+---------------+-----------+

可以使用 numPartitions 指定讀取資料的並行度:

option("numPartitions", 10)

在這裡,除了可以指定分割槽外,還可以設定上界和下界,任何小於下界的值都會被分配在第一個分割槽中,任何大於上界的值都會被分配在最後一個分割槽中。

val colName = "help_keyword_id"   //用於判斷上下界的列
val lowerBound = 300L    //下界
val upperBound = 500L    //上界
val numPartitions = 10   //分割槽綜述
val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
                             colName,lowerBound,upperBound,numPartitions,props)

想要驗證分割槽內容,可以使用 mapPartitionsWithIndex 這個運算元,程式碼如下:

jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
    val buffer = new ListBuffer[String]
    while (iterator.hasNext) {
        buffer.append(index + "分割槽:" + iterator.next())
    }
    buffer.toIterator
}).foreach(println)

執行結果如下:help_keyword 這張表只有 600 條左右的資料,本來資料應該均勻分佈在 10 個分割槽,但是 0 分割槽裡面卻有 319 條資料,這是因為設定了下限,所有小於 300 的資料都會被限制在第一個分割槽,即 0 分割槽。同理所有大於 500 的資料被分配在 9 分割槽,即最後一個分割槽。

6.2 寫入資料

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()


七、Text

Text 檔案在讀寫效能方面並沒有任何優勢,且不能表達明確的資料結構,所以其使用的比較少,讀寫操作如下:

7.1 讀取Text資料

spark.read.textFile("/usr/file/txt/dept.txt").show()

7.2 寫入Text資料

df.write.text("/tmp/spark/txt/dept")


八、資料讀寫高階特性

8.1 並行讀

多個 Executors 不能同時讀取同一個檔案,但它們可以同時讀取不同的檔案。這意味著當您從一個包含多個檔案的資料夾中讀取資料時,這些檔案中的每一個都將成為 DataFrame 中的一個分割槽,並由可用的 Executors 並行讀取。

8.2 並行寫

寫入的檔案或資料的數量取決於寫入資料時 DataFrame 擁有的分割槽數量。預設情況下,每個資料分割槽寫一個檔案。

8.3 分割槽寫入

分割槽和分桶這兩個概念和 Hive 中分割槽表和分桶表是一致的。都是將資料按照一定規則進行拆分儲存。需要注意的是 partitionBy 指定的分割槽和 RDD 中分割槽不是一個概念:這裡的分割槽表現為輸出目錄的子目錄,資料分別儲存在對應的子目錄中。

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")

輸出結果如下:可以看到輸出被按照部門編號分為三個子目錄,子目錄中才是對應的輸出檔案。

8.3 分桶寫入

分桶寫入就是將資料按照指定的列和桶數進行雜湊,目前分桶寫入只支援儲存為表,實際上這就是 Hive 的分桶表。

val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

8.5 檔案大小管理

如果寫入產生小檔案數量過多,這時會產生大量的元資料開銷。Spark 和 HDFS 一樣,都不能很好的處理這個問題,這被稱為“small file problem”。同時資料檔案也不能過大,否則在查詢時會有不必要的效能開銷,因此要把檔案大小控制在一個合理的範圍內。

在上文我們已經介紹過可以通過分割槽數量來控制生成檔案的數量,從而間接控制檔案大小。Spark 2.2 引入了一種新的方法,以更自動化的方式控制檔案大小,這就是 maxRecordsPerFile 引數,它允許你通過控制寫入檔案的記錄數來控制檔案大小。

 // Spark 將確保檔案最多包含 5000 條記錄
df.write.option(“maxRecordsPerFile”, 5000)


九、可選配置附錄

9.1 CSV讀寫可選配置

讀\寫操作 配置項 可選值 預設值 描述
Both seq 任意字元 ,(逗號) 分隔符
Both header true, false false 檔案中的第一行是否為列的名稱。
Read escape 任意字元 \ 轉義字元
Read inferSchema true, false false 是否自動推斷列型別
Read ignoreLeadingWhiteSpace true, false false 是否跳過值前面的空格
Both ignoreTrailingWhiteSpace true, false false 是否跳過值後面的空格
Both nullValue 任意字元 “” 宣告檔案中哪個字元表示空值
Both nanValue 任意字元 NaN 宣告哪個值表示 NaN 或者預設值
Both positiveInf 任意字元 Inf 正無窮
Both negativeInf 任意字元 -Inf 負無窮
Both compression or codec None,
uncompressed,
bzip2, deflate,
gzip, lz4, or
snappy
none 檔案壓縮格式
Both dateFormat 任何能轉換為 Java 的
SimpleDataFormat 的字串
yyyy-MM-dd 日期格式
Both timestampFormat 任何能轉換為 Java 的
SimpleDataFormat 的字串
yyyy-MMdd’T’HH:mm:ss.SSSZZ 時間戳格式
Read maxColumns 任意整數 20480 宣告檔案中的最大列數
Read maxCharsPerColumn 任意整數 1000000 宣告一個列中的最大字元數。
Read escapeQuotes true, false true 是否應該轉義行中的引號。
Read maxMalformedLogPerPartition 任意整數 10 宣告每個分割槽中最多允許多少條格式錯誤的資料,超過這個值後格式錯誤的資料將不會被讀取
Write quoteAll true, false false 指定是否應該將所有值都括在引號中,而不只是轉義具有引號字元的值。
Read multiLine true, false false 是否允許每條完整記錄跨域多行

9.2 JSON讀寫可選配置

讀\寫操作 配置項 可選值 預設值
Both compression or codec None,
uncompressed,
bzip2, deflate,
gzip, lz4, or
snappy
none
Both dateFormat 任何能轉換為 Java 的 SimpleDataFormat 的字串 yyyy-MM-dd
Both timestampFormat 任何能轉換為 Java 的 SimpleDataFormat 的字串 yyyy-MMdd’T’HH:mm:ss.SSSZZ
Read primitiveAsString true, false false
Read allowComments true, false false
Read allowUnquotedFieldNames true, false false
Read allowSingleQuotes true, false true
Read allowNumericLeadingZeros true, false false
Read allowBackslashEscapingAnyCharacter true, false false
Read columnNameOfCorruptRecord true, false Value of spark.sql.column&NameOf
Read multiLine true, false false

9.3 資料庫讀寫可選配置

屬性名稱 含義
url 資料庫地址
dbtable 表名稱
driver 資料庫驅動
partitionColumn,
lowerBound, upperBoun
分割槽總數,上界,下界
numPartitions 可用於表讀寫並行性的最大分割槽數。如果要寫的分割槽數量超過這個限制,那麼可以呼叫 coalesce(numpartition) 重置分割槽數。
fetchsize 每次往返要獲取多少行資料。此選項僅適用於讀取資料。
batchsize 每次往返插入多少行資料,這個選項只適用於寫入資料。預設值是 1000。
isolationLevel 事務隔離級別:可以是 NONE,READ_COMMITTED, READ_UNCOMMITTED,REPEATABLE_READ 或 SERIALIZABLE,即標準事務隔離級別。
預設值是 READ_UNCOMMITTED。這個選項只適用於資料讀取。
createTableOptions 寫入資料時自定義建立表的相關配置
createTableColumnTypes 寫入資料時自定義建立列的列型別

資料庫讀寫更多配置可以參閱官方文件:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

參考資料

  1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
  2. https://spark.apache.org/docs/latest/sql-data-sources.html

更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南