Spark權威指南(中文版)----第9章 資料來源
本章正式介紹Spark可以使用的開箱即用的各種其他資料來源,以及由更大的社群構建的無數其他資料來源。Spark有6個“核心”資料來源和數百個由社群編寫的外部資料來源。能夠從所有不同型別的資料來源進行讀寫,這可以說是Spark最大的優勢之一。以下是Spark的核心資料來源:
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files
如前所述,Spark有許多社群建立的資料來源。這裡只是一個小例子:
- Cassandra
- HBase
- MongoDB
- AWS Redshift
- XML
- 還有很多很多其他的
本章的目標是讓您能夠從Spark的核心資料來源讀寫資料,並瞭解在與第三方資料來源整合時應該尋找的內容。為了實現這一點,我們將重點放在您需要能夠識別和理解的核心概念上。
9.1.資料來源API的結構
在繼續學習如何從某些格式讀寫之前,讓我們先看看資料來源api的總體組織結構。
9.1.1.Read API Structure
讀取資料的核心結構如下:
DataFrameReader.format(...).option("key", "value").schema(...).load()
我們將使用這種格式從所有資料來源讀取資料。format是可選的,因為預設情況下Spark將使用Parquet格式。option允許您設定鍵值配置,以引數化讀取資料的方式。最後,如果資料來源提供模式,或者您打算使用模式推斷,那麼schema是可選的。當然,每種格式都有一些必需的選項,我們將在檢視每種格式時討論這些選項。
提示
Spark社群中有很多簡寫符號,資料來源read API也不例外。我們試著在整本書中保持一致,同時仍然揭示了一些簡寫符號。
9.1.2.讀資料的基礎
在Spark中讀取資料的基礎是DataFrameReader。我們通過read屬性通過SparkSession訪問它:
spark.read
有了DataFrame讀取器後,我們指定幾個值:
- The format格式
- The schema模式
- The read mode
- 一系列配置選項
格式、選項和模式每個返回一個DataFrameReader,該ader可以進行進一步的轉換,並且都是可選的,只有一個選項除外。每個資料來源都有一組特定的選項,它們決定如何將資料讀入Spark(我們將很快介紹這些選項)。至少,您必須為DataFrameReader提供一條要從中讀取的路徑。下面是一個整體樣式的例子:
spark.read.format("csv")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
設定選項的方法有很多種;例如,您可以構建一個map並傳遞您的配置。現在,我們將堅持您剛才看到的簡單而明確的方法。讀取模式從外部資料來源讀取資料自然會遇到格式不正確的資料,特別是在只使用半結構化資料來源時。讀取模式指定當Spark遇到格式不正確的記錄時會發生什麼。表9-1列出了讀取模式。
讀取模式 |
描述 |
Permissive(預設) |
當遇到損壞的記錄時,將所有欄位設定為null,並將所有損壞的記錄放在名為_corruption t_record的字串列中 |
dropMalformed |
刪除包含損壞記錄的行 |
failFast |
遇到不正確的記錄時立即失敗 |
9.1.3.Write API Structure
寫資料的核心結構如下:
DataFrameWriter.format(.).option(.).partitionBy(.).bucketBy(.).sortBy(.).save()
我們將使用這種格式寫入所有資料來源。format是可選的,因為預設情況下Spark將使用Parquet格式。option允許我們配置如何寫出給定的資料。僅對基於檔案的資料來源使用PartitionBy、bucketBy和sortBy;您可以使用它們來控制目標檔案的特定佈局。
9.1.4.寫資料的基礎
寫資料的基礎與讀資料的基礎非常相似。我們使用的是DataFrameWriter,而不是DataFrameReader。因為我們總是需要寫出一些給定的資料來源,所以我們通過write屬性在每個dataframe的基礎上訪問DataFrameWriter:
//in Scala
dataFrame.write
有了DataFrameWriter之後,我們指定三個值:格式、一系列選項和儲存模式。至少,您必須提供一條路徑。我們將很快討論選項的可能性,這些選項因資料來源而異。
// in Scala
dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()
Save modes
儲存模式指定如果Spark在指定位置找到資料會發生什麼(假設其他條件都相同)。表9-2列出了儲存模式。
Save mode |
描述 |
append |
將輸出檔案附加到該位置已經存在的檔案列表中 |
overwrite |
會完全覆蓋任何已經存在的資料 |
errorIfExists |
如果資料或檔案已經存在於指定位置,則引發錯誤並無法寫入 |
ignore |
如果該位置存在資料或檔案,則當前DataFrame不做任何操作 |
預設值是errorIfExists。這意味著,如果Spark在您要寫入的位置找到資料,它將立即導致寫入失敗。我們已經大致介紹了在使用資料來源時所需的核心概念,現在讓我們深入研究每個Spark內建支援的資料來源。
9.2.CSV檔案
CSV表示逗號分隔的值。這是一種常見的文字檔案格式,其中每一行表示一條記錄,記錄中的每個欄位用逗號分隔。CSV檔案雖然看起來結構很好,但實際上是您將遇到的最棘手的檔案格式之一,因為在生產場景中無法對它們包含什麼或如何進行結構假設。因此,CSV reader有很多選項。這些選項使您能夠解決一些問題,比如某些字元需要轉義(例如,當檔案也是逗號分隔的或以非常規方式標記的空值時,列內的逗號)。
9.2.1.CSV選項
表9-3給出了CSV reader中可用的選項。
讀/寫 |
Key |
可能值 |
預設值 |
描述 |
Both |
sep |
任何單個字串字元 |
, |
用於分隔每個欄位和值的單個字元。 |
Both |
header |
true,false |
false |
一個布林標誌,它宣告檔案中的第一行是否為列的名稱。 |
Read |
escape |
任何字元 |
\ |
轉義字元 |
Read |
inferSchema |
true, false |
false |
指定Spark在讀取檔案時是否應該推斷列型別。 |
Read |
ignoreLeadingWhiteSpace |
true, false |
false |
宣告是否跳過正在讀取的值的前導空格。 |
Read |
ignoreTrailingWhiteSpace |
true, false |
false |
宣告是否應跳過正在讀取的值的尾隨空格。 |
Both |
nullValue |
任何字元 |
“” |
宣告檔案中哪個字元表示null。 |
Both |
nanValue |
任何字元 |
NAN |
宣告CSV檔案中表示NaN或缺失字元的字元。 |
Both |
positiveInf |
任何字串或字元 |
Inf |
宣告哪個字元表示正無窮值 |
Both |
negativeInf |
任何字串或字元 |
-Inf |
宣告哪個字元表示負無窮值 |
Both |
compression or codec |
None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
None |
宣告Spark應使用什麼壓縮編解碼器來讀取或寫入檔案。 |
Both |
dateFormat |
任何符合java SimpleDataFormat的字串或字元。 |
yyyy-MM-dd |
為任何屬於日期型別的列宣告日期格式。 |
Both |
timestampFormat |
任何符合java SimpleDataFormat的字串或字元。 |
yyyy-MM-dd’T’HH:mm:ss.SSSZZ |
為任何屬於時間戳型別的列宣告時間戳格式。 |
Read |
maxColumns |
任意整數 |
20480 |
宣告檔案中的最大列數 |
Read |
maxCharsPerColumn |
任意整數 |
1000000 |
宣告列中的最大字元數 |
Read |
escapeQuotes |
true, false |
false |
宣告Spark是否應該轉義行中找到的引號。 |
Read |
maxMalformedLogPerPartition |
任意整數 |
10 |
設定每個分割槽將記錄的最大畸形行數。超過這個數字的畸形記錄將被忽略。 |
Write |
quoteAll |
true, false |
false |
指定是否應該將所有值都括在引號中,而不只是轉義具有引號字元的值。 |
Read |
multiLine |
true, false |
false |
此選項允許您讀取多行CSV檔案,其中CSV檔案中的每個邏輯行可能跨越檔案本身的多行。 |
9.2.2.讀CSV檔案
要像讀取其他格式一樣讀取CSV檔案,我們必須首先為該特定格式建立一個DataFrameReader。這裡,我們指定格式為CSV:
spark.read.format("csv")
在此之後,我們可以指定模式和選項。讓我們設定幾個選項,一些是我們在書的開頭看到的,另一些是我們還沒有看到的。我們將CSV檔案的頭部設定為true,模式為FAILFAST,模式推斷為true:
// in Scala
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.load("some/path/to/file.csv")
如前所述,我們可以使用該模式來指定對畸形資料的容忍度。例如,我們可以使用這些模式和我們在第5章中建立的模式來確保我們的檔案符合我們期望的資料:
// in Scala
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", StringType, true),
new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
new StructField("count", LongType, false)
))
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv")
.show(5)
當我們不期望資料以某種格式出現時,事情就變得棘手了,但無論如何,它是以這種方式出現的。例如,讓我們使用當前模式並將所有列型別更改為LongType。這與實際的模式不匹配,但是Spark對我們這樣做沒有問題。只有當Spark實際讀取資料時,這個問題才會顯現出來。一旦我們啟動Spark作業,由於資料不符合指定的模式,它將立即失敗(在我們執行作業之後):
// in Scala
val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", LongType, true),
new StructField("ORIGIN_COUNTRY_NAME", LongType, true),
new StructField("count", LongType, false) ))
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv")
.take(5)
通常,Spark只會在作業執行時失敗,而不是在DataFrame定義時失敗——即使我們指向一個不存在的檔案。這是由於延遲計算,我們在第2章中學習了這個概念。
9.2.3.寫CSV檔案
就像讀取資料一樣,當我們寫入CSV檔案時,有許多選項(表9-3中列出)可以用於寫入資料。這是讀取選項的子集,因為許多選項在寫入資料時不適用(如maxColumns和inferSchema)。這裡有一個例子:
// in Scala
val csvFile = spark.read.format("csv")
.option("header", "true").option("mode", "FAILFAST").schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv")
# in Python
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/data/flight-data/csv/2010-summary.csv")
例如,我們可以把我們的CSV檔案寫出去,作為一個TSV檔案,這很容易:
// in Scala
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")
.save("/tmp/my-tsv-file.tsv")
# in Python
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
.save("/tmp/my-tsv-file.tsv")
當您列出目標目錄時,您可以看到my-tsv-file實際上是一個包含許多檔案的資料夾:
$ ls /tmp/my-tsv-file.tsv/
/tmp/my-tsv-file.tsv/part-00000-35cf9453-1943-4a8c-9c82-9f6ea9742b29.csv
這實際上反映了在我們建立DataFrame時,DataFrame中分割槽的數量。如果在此之前對資料進行重新分割槽,最終會得到不同數量的檔案。我們將在本章末尾討論這種取捨。
9.3.JSON檔案
來自JavaScript領域的人可能熟悉JavaScript物件表示法,也就是通常所說的JSON。在處理這類資料時,有一些問題值得我們在開始之前考慮。在Spark中,當我們引用JSON檔案時,我們引用以行分隔的JSON檔案。這與每個檔案都有一個大型JSON物件或JSON陣列的檔案形成了對比。行分隔與多行權衡由一個選項控制:multiLine。當您將該選項設定為true時,您可以將整個檔案作為一個json物件讀取,Spark將完成將其解析為一個DataFrame的工作。以行分隔的JSON實際上是一種更穩定的格式,因為它允許您向檔案附加一條新記錄(而不是必須讀取整個檔案然後將其寫出來),這是我們推薦您使用的格式。行分隔JSON流行的另一個關鍵原因是JSON物件具有結構,而JavaScript (JSON基於JavaScript)至少具有基本型別。這使得處理起來更容易,因為Spark可以代表我們對資料做更多的假設。您會注意到,由於物件的關係,JSON選項要比CSV的選項少得多。
9.3.1.JSON配置選項
表9-4列出了JSON物件可用的選項及其描述。
讀/寫 |
Key |
可能值 |
預設值 |
描述 |
Both |
compression或codec |
None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
None |
宣告Spark應使用什麼壓縮編解碼器來讀取或寫入檔案。 |
Both |
dateFormat |
任何符合java SimpleDataFormat的字串或字元。 |
yyyy-MM-dd |
為任何屬於日期型別的列宣告日期格式。 |
Both |
timestampFormat |
任何符合java SimpleDataFormat的字串或字元。 |
yyyy-MM-dd’T’HH:mm:ss.SSSZZ |
為任何屬於時間戳型別的列宣告時間戳格式。 |
Read |
primitiveAsString |
true, false |
false |
將所有基本值推斷為字串型別。 |
Read |
allowComments |
true, false |
false |
忽略JSON記錄中的Java/ c++樣式註釋。 |
Read |
allowUnquotedFieldNames |
true, false |
false |
允許非引號JSON欄位名 |
Read |
allowSingleQuotes |
true, false |
true |
除雙引號外,還允許單引號。 |
Read |
allowNumericLeadingZeros |
true, false |
false |
允許數字的前導零(例如,00012)。 |
Read |
allowBackslashEscapingAnyCharacter |
true, false |
false |
允許使用反斜槓引用機制接受所有字元的引用。 |
Read |
columnNameOfCorruptRecord |
任意字串 |
spark.sql .column&NameOfCorruptRecord值 |
允許使用permissive模式建立的畸形字串重新命名新欄位。這將覆蓋配置值。 |
Read |
multiLine |
true, false |
false |
允許讀取非行分隔的JSON檔案。 |
現在,讀取以行分隔的JSON檔案只在格式和我們指定的選項上有所不同:
spark.read.format("json")
9.3.2.讀取JSON檔案
讓我們看一個讀取JSON檔案的例子,並比較我們看到的配置選項:
// in Scala
spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema)
.load("/data/flight-data/json/2010-summary.json").show(5)
# in Python
spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/data/flight-data/json/2010-summary.json").show(5)
9.3.3.寫JSON檔案
編寫JSON檔案與讀取它們一樣簡單,而且,正如您所期望的那樣,資料來源並不重要。因此,我們可以重用前面建立的CSV DataFrame作為JSON檔案的源。這也遵循我們之前指定的規則:每個分割槽寫一個檔案,整個DataFrame作為一個資料夾寫。每一行也有一個JSON物件:
// in Scala
csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")
# in Python
csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")
$ ls /tmp/my-json-file.json/
/tmp/my-json-file.json/part-00000-tid-543....json
9.4.Parquet檔案
Parquet是一個開源的面向列的資料儲存,它提供了多種儲存優化,特別是對於分析工作負載。它提供了columnar壓縮,節省了儲存空間,允許讀取單獨的列而不是整個檔案。它是一種與Apache Spark非常配合的檔案格式,實際上是預設的檔案格式。我們建議將資料寫入Parquet進行長期儲存,因為從Parquet檔案讀取總是比JSON或CSV更有效。Parquet的另一個優點是它支援複雜的型別。這意味著,如果您的列是一個數組(例如,CSV檔案可能會失敗)、map或struct,您仍然能夠毫無問題地讀寫該檔案。下面是如何指定Parquet作為讀取格式:
spark.read.format("parquet")
9.4.1.讀取parquet檔案
Parquet幾乎沒有什麼選項,因為它在儲存資料時強制執行自己的模式。因此,您只需要設定格式就可以了。如果我們對DataFframe的外觀有嚴格的要求,我們可以設定模式。通常這是不必要的,因為我們可以在讀取時使用模式,這與使用CSV檔案的推斷模式類似。但是,對於Parquet檔案,這種方法更強大,因為模式構建到檔案本身(因此不需要推理)。下面是一些讀取Parquet檔案的簡單例子:
spark.read.format("parquet")
// in Scala
spark.read.format("parquet")
.load("/data/flight-data/parquet/2010-summary.parquet").show(5)
# in Python
spark.read.format("parquet")\
.load("/data/flight-data/parquet/2010-summary.parquet").show(5)
9.4.2.Parquet配置項
正如我們剛才提到的,很少有parquet的配置選項—確切地說,只有兩個—因為它有一個定義良好的規範,與Spark中的概念緊密一致。表9-5給出了這些選項。
警告
儘管只有兩種選擇,但如果使用不相容的parquet檔案,仍然會遇到問題。當您使用不同版本的Spark(尤其是舊版本)寫parquet檔案時要小心,因為這可能會引起嚴重的麻煩。
讀/寫 |
Key |
可能值 |
預設值 |
描述 |
Write |
compression 或codec |
None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
None |
宣告Spark應使用什麼壓縮編解碼器來讀取或寫入檔案。 |
Read |
mergeSchema |
true, false |
配置spark.sql.parquet.mergeSchema的值 |
您可以在相同的表/資料夾中增量地將列新增到新編寫的parquet檔案中。使用此選項可啟用或禁用此功能。 |
9.4.3.寫parquet檔案
Parquet檔案寫起來和讀起來一樣容易。我們只需指定檔案的位置。適用相同的分割槽規則:
// in Scala
csvFile.write.format("parquet").mode("overwrite")
.save("/tmp/my-parquet-file.parquet")
# in Python
csvFile.write.format("parquet").mode("overwrite")\
.save("/tmp/my-parquet-file.parquet")
9.5.ORC檔案
ORC是一種為Hadoop工作負載設計的自描述、型別感知的列檔案格式。它針對大型流讀取進行了優化,但是集成了快速查詢所需行的支援。ORC實際上沒有讀取資料的選項,因為Spark非常理解檔案格式。一個經常被問到的問題是:ORC和Parquet有什麼不同?在很大程度上,它們非常相似;基本的區別是,Parquet是使用Spark的進一步優化,而ORC是使用Hive的進一步優化。
9.5.1.讀取ORC檔案
下面是如何將ORC檔案讀入Spark:
// in Scala
spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5)
# in Python
spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5)
9.5.2.寫ORC檔案
在本章的這一點上,您應該很容易猜到如何寫ORC檔案。它確實遵循了我們目前看到的完全相同的模式,即我們指定格式,然後儲存檔案:
// in Scala
csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc")
# in Python
csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc")
9.6.SQL資料庫
SQL資料來源是功能更強大的聯結器之一,因為可以連線到多種系統(只要該系統使用SQL)。例如,您可以連線到MySQL資料庫、PostgreSQL資料庫或Oracle資料庫。您還可以連線到SQLite,在本例中我們將這樣做。當然,資料庫不僅僅是一組原始檔案,所以對於如何連線到資料庫,有更多的選項需要考慮。也就是說,您需要開始考慮諸如身份驗證和連線之類的問題(您需要確定Spark叢集的網路是否連線到資料庫系統的網路)。為了避免為本書的目的而設定資料庫,我們提供了一個在SQLite上執行的參考示例。通過使用SQLite,我們可以跳過很多這樣的細節,因為它可以在本地機器上以最小的設定工作,同時限制了不能在分散式設定中工作。如果希望在分散式環境中處理這些示例,則需要連線到另一種資料庫。
SQLITE簡介
SQLite是全世界使用最多的資料庫引擎,這是有原因的。它功能強大、速度快、易於理解。這是因為SQLite資料庫只是一個檔案。這將使您非常容易地啟動和執行,因為我們在本書的repository中包含了原始檔。只需將該檔案下載到本地計算機,就可以從該檔案中讀取和寫入。我們使用SQLite,但是這裡的所有程式碼都與更傳統的關係資料庫(如MySQL)一起工作。主要區別在於連線到資料庫時包含的屬性。當我們使用SQLite時,沒有使用者或密碼的概念。
警告
雖然SQLite是一個很好的參考示例,但它可能不是您想在生產中使用的。此外,SQLite在分散式設定中不一定能很好地工作,因為它需要在寫時鎖定整個資料庫。我們在這裡展示的示例也將以類似的方式使用MySQL或PostgreSQL。
要從這些資料庫讀寫,您需要做兩件事:在spark類路徑上為您的特定資料庫包含Java資料庫連線(JDBC)驅動程式,併為驅動程式本身提供適當的JAR。例如,為了能夠從PostgreSQL讀取和寫入,您可以執行如下程式碼:
./bin/spark-shell \
--driver-class-path postgresql-9.4.1207.jar \
--jarspostgresql-9.4.1207.jar
與我們的其他源一樣,在從SQL資料庫讀取和寫入時,有許多選項可用。其中只有一些與我們當前的示例相關,但是表9-6列出了在使用JDBC資料庫時可以設定的所有選項。
屬性名稱 |
屬性含義 |
url |
要連線到的JDBC URL。特定於源的連線屬性可以在URL中指定;例如:jdbc:postgresql://localhost/test?user=fred&password=secret. |
dbtable |
要讀取的JDBC表。注意,可以使用SQL查詢的FROM子句中有效的任何內容。例如,您還可以在括號中使用子查詢,而不是完整的表。 |
driver |
用於連線到此URL的JDBC驅動程式的類名。 |
partitionColumn, lowerBound, upperBound |
如果指定了其中一個選項,那麼還必須設定所有其他選項。此外,必須指定numpartition。這些屬性描述瞭如何在從多個worker並行讀取資料時對錶進行分割槽。partitionColumn必須是表中的數字列。注意,lowerBound和lowerBound僅用於確定分割槽步長,而不是用於過濾表中的行。因此,表中的所有行都將被分割槽並返回。此選項僅適用於read。 |
numPartitions |
可用於表讀寫並行性的最大分割槽數。這也決定了併發JDBC連線的最大數量。 |
fetchsize |
此配置項決定每次往返要獲取多少行。這可以提高JDBC驅動程式的效能,JDBC驅動程式預設取值大小較低(例如,Oracle有10行)。此選項僅適用於read。 |
batchsize |
JDBC批處理大小,它決定每次往返插入多少行。這有助於JDBC驅動程式的效能。這個選項只適用於write。預設值是1000。 |
isolationLevel |
事務隔離級別,適用於當前連線。它可以是NONE、READ_COMMITTED、READ_UNCOMMITTED、REPEATABLE_READ或SERIALIZABLE中的一個,對應於JDBC連線物件定義的標準事務隔離級別。預設值是READ_UNCOMMITTED。這個選項只適用於Write。有關更多資訊,請參閱java.sql.Connection中的文件。 |
truncate |
這是一個與JDBC writer相關的選項。當啟用SaveMode.Overwrite,Spark將truncate現有表,而不是刪除和重新建立它。這可以更有效,並且可以防止刪除表元資料(例如索引)。但是,在某些情況下,例如當新資料具有不同的schema時,它將不起作用。預設值為false。這個選項只適用於write。 |
createTableOptions |
這是一個與JDBC writer相關的選項。如果指定,這個選項允許在建立表時設定特定於資料庫的表和分割槽選項(例如,CREATE table t (name string) ENGINE=InnoDB)。這個選項只適用於write。 |
createTableColumnTypes |
建立表時要使用的資料庫列資料型別,替換預設資料型別。資料型別資訊應該以與CREATE TABLE columns語法相同的格式指定(例如,“name CHAR(64),comments VARCHAR(1024)”)。指定的型別應該是有效的Spark SQL資料型別。這個選項只適用於write。 |
9.6.1.從SQL資料庫中讀取
在讀取檔案時,SQL資料庫與我們前面看到的其他資料來源沒有什麼不同。對於這些源,我們指定格式和選項,然後載入資料:
// in Scala
val driver = "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"
# in Python
driver = "org.sqlite.JDBC"
path = "/data/flight-data/jdbc/my-sqlite.db"
url = "jdbc:sqlite:" + path
tablename="flight_info"
在定義連線屬性之後,可以測試到資料庫本身的連線,以確保其功能正常。這是一種優秀的故障排除技術,可以確認您的資料庫(至少)對Spark driver程式可用。這與SQLite關係不大,因為SQLite是您機器上的一個檔案,但是如果您使用的是MySQL之類的工具,您可以使用以下工具測試連線:
import java.sql.DriverManager
val connection = DriverManager.getConnection(url)
connection.isClosed()
connection.close()
如果連線成功,您就可以開始了。讓我們從SQL表中讀取DataFrame:
// in Scala
val dbDataFrame = spark.read.format("jdbc").option("url", url)
.option("dbtable", tablename).option("driver", driver).load()
# in Python
dbDataFrame = spark.read.format("jdbc").option("url", url)\
.option("dbtable",tablename).option("driver",driver).load()
SQLite的配置相當簡單(例如,沒有使用者)。其他資料庫,比如PostgreSQL,需要更多的配置引數。讓我們執行剛才執行的讀取操作,這次使用PostgreSQL:
// in Scala
val pgDF = spark.read
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://database_server")
.option("dbtable", "schema.tablename")
.option("user", "username").option("password","my-secret-password").load()
# in Python
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://database_server")\
.option("dbtable", "schema.tablename")\
.option("user","username").option("password","my-secret-password").load()
當我們建立這個DataFrame時,它與其他資料沒有什麼不同:您可以對其執行查詢操作、transform操作、join操作。您還會注意到,這裡已經有了一個模式。這是因為Spark從表本身收集這些資訊,並將型別對映到Spark資料型別。讓我們只得到不同的location欄位,以驗證我們可以查詢它:
太棒了,我們可以查詢資料庫!在我們繼續之前,有一些細微的細節是值得理解的。
9.6.2.Query Pushdown
首先,Spark在建立DataFrame之前盡最大努力過濾資料庫本身中的資料。例如,在之前的查詢示例中,我們可以從查詢計劃中看到,它只從表中選擇了相關的列名:
Spark實際上可以在某些查詢上做得更好。例如,如果我們在DataFrame上指定一個過濾器,Spark將把該過濾器下推到資料庫中。我們可以在PushedFilters下的explain計劃中看到這一點。
Spark不能將它自己的所有函式轉換為您正在使用的SQL資料庫中可用的函式。因此,有時需要將整個查詢傳遞到SQL中,以DataFrame的形式返回結果。這看起來有點複雜,但實際上很簡單。您只需指定一個SQL查詢,而不是指定一個表名。當然,您需要以一種特殊的方式指定它;你必須把查詢用括號括起來,然後把它重新命名——在這種情況下,我只是給了它相同的表名:
現在,當您查詢這個表時,實際上是在查詢該查詢的結果。我們可以在解釋計劃中看到這一點。Spark甚至不知道表的實際模式,只知道我們之前查詢的結果:
9.6.3.並行讀取資料庫
在本書中,我們一直在討論分割槽及其在資料處理中的重要性。Spark有一個底層演算法,可以將多個檔案讀入一個分割槽,或者反過來,根據檔案大小和檔案型別和壓縮的“可分割性”從一個檔案讀入多個分割槽。與檔案具有相同的靈活性,SQL資料庫也具有相同的靈活性,只是您必須手動配置它。正如前面的選項所示,您可以配置的是指定最大分割槽數的能力,以允許您限制並行讀寫的數量:
在這種情況下,這個分割槽仍然作為一個分割槽,因為沒有太多的資料。但是,這種配置可以幫助您確保在讀取和寫入資料時不會壓垮資料庫:
dbDataFrame.select("DEST_COUNTRY_NAME").distinct().show()
不幸的是,還有其他一些優化似乎只在另一個API集中進行。您可以通過連線本身顯式地將謂詞下推到SQL資料庫中。這種優化允許您通過指定謂詞來控制特定分割槽中特定資料的物理位置。這有點拗口,我們來看一個簡單的例子。我們的資料中只需要兩個國家的資料:安圭拉和瑞典。我們可以過濾它們並將它們推入資料庫,但是我們還可以更進一步,讓它們到達Spark中自己的分割槽。我們通過在建立資料來源時指定謂詞列表來實現:
如果指定的謂詞不是不相交的,則可能會出現大量重複的行。下面是一個謂詞示例集,它將導致重複的行:
9.6.4.基於滑動視窗的分割槽
讓我們看看如何基於謂詞進行分割槽。在本例中,我們將根據數值型別的count列進行分割槽。這裡,我們為第一個分割槽和最後一個分割槽都指定了最小值和最大值。任何超出這些界限的都在第一個分割槽或最後一個分割槽中。然後,我們設定希望的分割槽總數(這是並行度的級別)。然後Spark並行查詢我們的資料庫並返回numpartition分割槽。我們只需修改上界和下界,以便在某些分割槽中放置某些值。沒有像我們在前面的例子中看到的那樣進行過濾:
這將把區間從低到高平均分配:
// in Scala
spark.read.jdbc(url,tablename,colName,lowerBound,upperBound,numPartitions,props)
.count() // 255
# in Python
spark.read.jdbc(url, tablename, column=colName, properties=props,
lowerBound=lowerBound, upperBound=upperBound,
numPartitions=numPartitions).count()#255
9.6.5.寫入SQL資料庫
寫SQL資料庫和以前一樣簡單。您只需指定URI並根據所需的指定寫入模式寫出資料。在下面的示例中,我們指定overwrite,它覆蓋整個表。我們將使用之前定義的CSV DataFframe來實現這一點:
讓我們看看結果:
當然,我們也可以很容易地將這個新表記錄追加到表中:
9.7.Text檔案
Spark還允許您讀取純文字檔案。檔案中的每一行都成為DataFrame中的一條記錄。然後就由您來相應地轉換它。作為實現此目的的一個示例,假設您需要將一些Apache日誌檔案解析為某種更結構化的格式,或者您可能希望解析一些純文字以進行自然語言處理。文字檔案為Dataset API提供了很好的引數,因為它能夠利用native型別的靈活性。
9.7.1.讀文字檔案
讀取文字檔案很簡單:只需指定textFile的型別。對於textFile,分割槽目錄名將被忽略。要根據分割槽讀寫文字檔案,應該使用text,其在讀寫上關注分割槽:
spark.read.textFile("/data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value,',')asrows").show()
9.7.2.寫文字檔案
編寫文字檔案時,需要確保只有一個字串列;否則,寫入將失敗:
csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt")
如果在執行寫操作時執行一些分割槽(我們將在接下來的幾頁中討論分割槽),則可以寫更多的列。然而,這些列將顯示為您要寫入的資料夾中的目錄,而不是每個檔案上的列:
9.8.高階IO概念
我們在前面已經看到,我們可以通過在寫之前控制分割槽來控制所寫檔案的並行性。我們還可以通過控制兩件事來控制特定的資料佈局:bucketing和partitioning(稍後討論)。
9.8.1.可分割的檔案型別和壓縮
某些檔案格式基本上是“可分割的”。這可以提高速度,因為Spark可以避免讀取整個檔案,只訪問滿足查詢所需的檔案部分。此外,如果您正在使用Hadoop分散式檔案系統(HDFS)之類的東西,如果檔案跨越多個塊,那麼分割檔案可以提供進一步的優化。與此同時,還需要管理壓縮。並不是所有的壓縮方案都是可分割的。如何儲存資料對於使Spark作業順利執行非常重要。我們建議gzip壓縮的Parquet儲存。
9.8.2.並行讀取資料
多個執行器不能同時讀取同一個檔案,但它們可以同時讀取不同的檔案。一般來說,這意味著當您從一個包含多個檔案的資料夾中讀取資料時,這些檔案中的每一個都將成為DataFrame中的一個分割槽,並由可用的執行器executor並行讀取(其餘的檔案將排在其他檔案之後)
9.8.3.並行寫入資料
寫入的檔案或資料的數量取決於在寫入資料時DataFrame擁有的分割槽數量。預設情況下,每個資料分割槽寫一個檔案。這意味著,儘管我們指定了一個“檔案”,但它實際上是一個資料夾中的許多檔案,具有指定檔案的名稱,每個寫入的分割槽都有一個檔案。例如,下面的程式碼
csvFile.repartition(5).write.format("csv").save("/tmp/multiple.csv")
將在該資料夾中生成5個檔案。從列表中可以看到:
分割槽
分割槽是一種工具,允許您在編寫時控制儲存什麼資料(以及儲存在哪裡)。當您將檔案寫入分割槽目錄(或表)時,您基本上將列編碼為資料夾。這允許您在以後讀取資料時跳過大量資料,只讀取與您的問題相關的資料,而不必掃描整個資料集。所有基於檔案的資料來源都支援這些:
寫完後,你會得到一個資料夾列表在你的Parquet “檔案”:
每一個都包含Parquet檔案,其中包含前面謂詞為true的資料:
這可能是您可以使用的最低優化,當您的表在操作之前經常被讀取器過濾。例如,date對於分割槽特別常見,因為在下游,我們通常只想檢視前一週的資料(而不是掃描整個記錄列表)。這可以為讀者提供大量的加速。Bucketing分桶bucket是另一種檔案組織方法,您可以使用這種方法控制專門寫入每個檔案的資料。這可以幫助避免以後讀取資料時出現混亂,因為具有相同bucketID的資料都將被分組到一個物理分割槽中。這意味著資料是根據您希望以後如何使用該資料進行預分割槽的,這意味著您可以在連線或聚合時避免昂貴的重新排序。與其在特定的列上進行分割槽(可能會寫出大量目錄),還不如研究資料的分桶。這將建立一定數量的檔案,並組織我們的資料到這些“桶”:
僅spark管理的表支援分桶。想了解更多關於分塊和分塊的資訊,請觀看Spark Summit 2017的這篇演講。
https://spark-summit.org/2017/events/why-you-should-care-about-data-layout-in-the-filesystem/
9.8.4.寫複雜型別
正如我們在第6章中介紹的,Spark有各種不同的內部型別。儘管Spark可以處理所有這些型別,但並不是每種型別都能很好地處理每種資料檔案格式。例如,CSV檔案不支援複雜型別,而Parquet和ORC支援。
9.8.5.管理檔案的大小
管理檔案大小不是寫資料的重要因素,而是以後讀取資料的重要因素。當您編寫大量小檔案時,管理所有這些檔案會產生大量的元資料開銷。Spark尤其不能很好地處理小檔案,儘管許多檔案系統(如HDFS)也不能很好地處理大量小檔案。您可能會聽到這被稱為“小檔案問題”。反過來也是正確的:您也不想要太大的檔案,因為當您只需要幾行資料時,讀取整個資料塊會變得低效。Spark 2.2引入了一種新的方法,以更自動化的方式控制檔案大小。我們在前面已經看到,輸出檔案的數量是寫時分割槽數量(以及我們選擇的分割槽列)的匯出數。現在,您可以利用另一個工具來限制輸出檔案的大小,這樣您就可以獲得一個最佳的檔案大小。您可以使用maxRecordsPerFile選項並指定您所選擇的數量。這允許您通過控制寫入每個檔案的記錄數量來更好地控制檔案大小。例如,如果為writer設定一個選項為df.write。選項(“maxRecordsPerFile”,5000),Spark將確保檔案最多包含5000條記錄。
9.9.結束語
在本章中,我們討論了在Spark中讀取和寫入資料的各種選項。這幾乎涵蓋了您作為Spark的日常使用者需要知道的所有內容。對於好奇的人來說,有幾種方法可以實現您自己的資料來源;但是,我們省略瞭如何實現這一點的說明,因為API目前正在發展為更好地支援結構化流。如果您有興趣瞭解如何實現您自己的自定義資料來源,那麼Cassandra聯結器是經過良好組織和維護的,可以為喜歡冒險的人提供參考。