《Spark 官方文件》Spark SQL, DataFrames 以及 Datasets 程式設計指南
Spark SQL, DataFrames 以及 Datasets 程式設計指南
概要
Spark SQL是Spark中處理結構化資料的模組。與基礎的Spark RDD API不同,Spark SQL的介面提供了更多關於資料的結構資訊和計算任務的執行時資訊。在Spark內部,Spark SQL會能夠用於做優化的資訊比RDD API更多一些。Spark SQL如今有了三種不同的API:SQL語句、DataFrame API和最新的Dataset API。不過真正執行計算的時候,無論你使用哪種API或語言,Spark SQL使用的執行引擎都是同一個。這種底層的統一,使開發者可以在不同的API之間來回切換,你可以選擇一種最自然的方式,來表達你的需求。
本文中所有的示例都使用Spark釋出版本中自帶的示例資料,並且可以在spark-shell、pyspark shell以及sparkR shell中執行。
SQL
Spark SQL的一種用法是直接執行SQL查詢語句,你可使用最基本的SQL語法,也可以選擇HiveQL語法。Spark SQL可以從已有的Hive中讀取資料。更詳細的請參考Hive Tables 這一節。如果用其他程式語言執行SQL,Spark SQL將以DataFrame返回結果。你還可以通過命令列command-line 或者 JDBC/ODBC 使用Spark SQL。
DataFrames
DataFrame是一種分散式資料集合,每一條資料都由幾個命名欄位組成。概念上來說,她和關係型資料庫的表 或者 R和Python中的data frame等價,只不過在底層,DataFrame採用了更多優化。DataFrame可以從很多資料來源(
DataFrame API支援Scala, Java, Python, and R。
Datasets
Dataset是Spark-1.6新增的一種API,目前還是實驗性的。Dataset想要把RDD的優勢(強型別,可以使用lambda表示式函式)和Spark SQL的優化執行引擎的優勢結合到一起。Dataset可以由JVM物件構建(constructed )得到,而後Dataset上可以使用各種transformation運算元(map,flatMap,filter 等)。
Dataset API 對
入門
入口:SQLContext
Spark SQL所有的功能入口都是SQLContext
類,及其子類。不過要建立一個SQLContext物件,首先需要有一個SparkContext物件。
val sc: SparkContext // 假設已經有一個 SparkContext 物件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 用於包含RDD到DataFrame隱式轉換操作
import sqlContext.implicits._
除了SQLContext之外,你也可以建立HiveContext,HiveContext是SQLContext 的超集。
除了SQLContext的功能之外,HiveContext還提供了完整的HiveQL語法,UDF使用,以及對Hive表中資料的訪問。要使用HiveContext,你並不需要安裝Hive,而且SQLContext能用的資料來源,HiveContext也一樣能用。HiveContext是單獨打包的,從而避免了在預設的Spark釋出版本中包含所有的Hive依賴。如果這些依賴對你來說不是問題(不會造成依賴衝突等),建議你在Spark-1.3之前使用HiveContext。而後續的Spark版本,將會逐漸把SQLContext升級到和HiveContext功能差不多的狀態。
spark.sql.dialect選項可以指定不同的SQL變種(或者叫SQL方言)。這個引數可以在SparkContext.setConf裡指定,也可以通過 SQL語句的SET key=value命令指定。對於SQLContext,該配置目前唯一的可選值就是”sql”,這個變種使用一個Spark SQL自帶的簡易SQL解析器。而對於HiveContext,spark.sql.dialect 預設值為”hiveql”,當然你也可以將其值設回”sql”。僅就目前而言,HiveSQL解析器支援更加完整的SQL語法,所以大部分情況下,推薦使用HiveContext。
建立DataFrame
Spark應用可以用SparkContext建立DataFrame,所需的資料來源可以是已有的RDD(existing RDD
),或者Hive表,或者其他資料來源(data sources.)
以下是一個從JSON檔案建立DataFrame的小栗子:
val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// 將DataFrame內容列印到stdout
df.show()
DataFrame操作
DataFrame提供了結構化資料的領域專用語言支援,包括Scala, Java, Python and R.
這裡我們給出一個結構化資料處理的基本示例:
val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 建立一個 DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// 展示 DataFrame 的內容
df.show()
// age name
// null Michael
// 30 Andy
// 19 Justin
// 列印資料樹形結構
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// select "name" 欄位
df.select("name").show()
// name
// Michael
// Andy
// Justin
// 展示所有人,但所有人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// 篩選出年齡大於21的人
df.filter(df("age") > 21).show()
// age name
// 30 Andy
// 計算各個年齡的人數
df.groupBy("age").count().show()
// age count
// null 1
// 19 1
// 30 1
除了簡單的欄位引用和表示式支援之外,DataFrame還提供了豐富的工具函式庫,包括字串組裝,日期處理,常見的數學函式等。完整列表見這裡:DataFrame Function Reference.
程式設計方式執行SQL查詢
SQLContext.sql可以執行一個SQL查詢,並返回DataFrame結果。
val sqlContext = ... // 已有一個 SQLContext 物件
val df = sqlContext.sql("SELECT * FROM table")
建立Dataset
Dataset API和RDD類似,不過Dataset不使用Java序列化或者Kryo,而是使用專用的編碼器(Encoder )來序列化物件和跨網路傳輸通訊。如果這個編碼器和標準序列化都能把物件轉位元組,那麼編碼器就可以根據程式碼動態生成,並使用一種特殊資料格式,這種格式下的物件不需要反序列化回來,就能允許Spark進行操作,如過濾、排序、雜湊等。
// 對普通型別資料的Encoder是由 importing sqlContext.implicits._ 自動提供的
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // 返回: Array(2, 3, 4)
// 以下這行不僅定義了case class,同時也自動為其建立了Encoder
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()
// DataFrame 只需提供一個和資料schema對應的class即可轉換為 Dataset。Spark會根據欄位名進行對映。
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]
和RDD互操作
Spark SQL有兩種方法將RDD轉為DataFrame。
1. 使用反射機制,推導包含指定型別物件RDD的schema。這種基於反射機制的方法使程式碼更簡潔,而且如果你事先知道資料schema,推薦使用這種方式;
2. 程式設計方式構建一個schema,然後應用到指定RDD上。這種方式更囉嗦,但如果你事先不知道資料有哪些欄位,或者資料schema是執行時讀取進來的,那麼你很可能需要用這種方式。
利用反射推導schema
Spark SQL的Scala介面支援自動將包含case class物件的RDD轉為DataFrame。對應的case class定義了表的schema。case class的引數名通過反射,對映為表的欄位名。case class還可以巢狀一些複雜型別,如Seq和Array。RDD隱式轉換成DataFrame後,可以進一步註冊成表。隨後,你就可以對錶中資料使用SQL語句查詢了。
// sc 是已有的 SparkContext 物件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 為了支援RDD到DataFrame的隱式轉換
import sqlContext.implicits._
// 定義一個case class.
// 注意:Scala 2.10的case class最多支援22個欄位,要繞過這一限制,
// 你可以使用自定義class,並實現Product介面。當然,你也可以改用程式設計方式定義schema
case class Person(name: String, age: Int)
// 建立一個包含Person物件的RDD,並將其註冊成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// sqlContext.sql方法可以直接執行SQL語句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
// SQL查詢的返回結果是一個DataFrame,且能夠支援所有常見的RDD運算元
// 查詢結果中每行的欄位可以按欄位索引訪問:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// 或者按欄位名訪問:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
// row.getValuesMap[T] 會一次性返回多列,並以Map[String, T]為返回結果型別
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回結果: Map("name" -> "Justin", "age" -> 19)
程式設計方式定義Schema
如果不能事先通過case class定義schema(例如,記錄的欄位結構是儲存在一個字串,或者其他文字資料集中,需要先解析,又或者欄位對不同使用者有所不同),那麼你可能需要按以下三個步驟,以程式設計方式的建立一個DataFrame:
- 從已有的RDD建立一個包含Row物件的RDD
- 用StructType建立一個schema,和步驟1中建立的RDD的結構相匹配
- 把得到的schema應用於包含Row物件的RDD,呼叫這個方法來實現這一步:SQLContext.createDataFrame
For example:
例如:
// sc 是已有的SparkContext物件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 建立一個RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// 資料的schema被編碼與一個字串中
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL 各個資料型別
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// 基於前面的字串生成schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// 將RDD[people]的各個記錄轉換為Rows,即:得到一個包含Row物件的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// 將schema應用到包含Row物件的RDD上,得到一個DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// 將DataFrame註冊為table
peopleDataFrame.registerTempTable("people")
// 執行SQL語句
val results = sqlContext.sql("SELECT name FROM people")
// SQL查詢的結果是DataFrame,且能夠支援所有常見的RDD運算元
// 並且其欄位可以以索引訪問,也可以用欄位名訪問
results.map(t => "Name: " + t(0)).collect().foreach(println)
資料來源
Spark SQL支援基於DataFrame操作一系列不同的資料來源。DataFrame既可以當成一個普通RDD來操作,也可以將其註冊成一個臨時表來查詢。把DataFrame註冊為table之後,你就可以基於這個table執行SQL語句了。本節將描述載入和儲存資料的一些通用方法,包含了不同的Spark資料來源,然後深入介紹一下內建資料來源可用選項。
通用載入/儲存函式
在最簡單的情況下,所有操作都會以預設型別資料來源來載入資料(預設是Parquet,除非修改了spark.sql.sources.default 配置)。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
手動指定選項
你也可以手動指定資料來源,並設定一些額外的選項引數。資料來源可由其全名指定(如,org.apache.spark.sql.parquet),而對於內建支援的資料來源,可以使用簡寫名(json, parquet, jdbc)。任意型別資料來源建立的DataFrame都可以用下面這種語法轉成其他型別資料格式。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
直接對檔案使用SQL
Spark SQL還支援直接對檔案使用SQL查詢,不需要用read方法把檔案載入進來。
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
儲存模式
Save操作有一個可選引數SaveMode,用這個引數可以指定如何處理資料已經存在的情況。很重要的一點是,這些儲存模式都沒有加鎖,所以其操作也不是原子性的。另外,如果使用Overwrite模式,實際操作是,先刪除資料,再寫新資料。
僅Scala/Java | 所有支援的語言 | 含義 |
---|---|---|
SaveMode.ErrorIfExists (default) |
"error" (default) |
(預設模式)從DataFrame向資料來源儲存資料時,如果資料已經存在,則拋異常。 |
SaveMode.Append |
"append" |
如果資料或表已經存在,則將DataFrame的資料追加到已有資料的尾部。 |
SaveMode.Overwrite |
"overwrite" |
如果資料或表已經存在,則用DataFrame資料覆蓋之。 |
SaveMode.Ignore |
"ignore" |
如果資料已經存在,那就放棄儲存DataFrame資料。這和SQL裡CREATE TABLE IF NOT EXISTS有點類似。 |
儲存到持久化表
在使用HiveContext的時候,DataFrame可以用saveAsTable方法,將資料儲存成持久化的表。與registerTempTable不同,saveAsTable會將DataFrame的實際資料內容儲存下來,並且在HiveMetastore中建立一個遊標指標。持久化的表會一直保留,即使Spark程式重啟也沒有影響,只要你連線到同一個metastore就可以讀取其資料。讀取持久化表時,只需要用用表名作為引數,呼叫SQLContext.table方法即可得到對應DataFrame。
預設情況下,saveAsTable會建立一個”managed table“,也就是說這個表資料的位置是由metastore控制的。同樣,如果刪除表,其資料也會同步刪除。
Parquet檔案
Parquet 是一種流行的列式儲存格式。Spark SQL提供對Parquet檔案的讀寫支援,而且Parquet檔案能夠自動儲存原始資料的schema。寫Parquet檔案的時候,所有的欄位都會自動轉成nullable,以便向後相容。
程式設計方式載入資料
仍然使用上面例子中的資料:
// 我們繼續沿用之前例子中的sqlContext物件
// 為了支援RDD隱式轉成DataFrame
import sqlContext.implicits._
val people: RDD[Person] = ... // 和上面例子中相同,一個包含case class物件的RDD
// 該RDD將隱式轉成DataFrame,然後儲存為parquet檔案
people.write.parquet("people.parquet")
// 讀取上面儲存的Parquet檔案(多個檔案 - Parquet儲存完其實是很多個檔案)。Parquet檔案是自描述的,檔案中儲存了schema資訊
// 載入Parquet檔案,並返回DataFrame結果
val parquetFile = sqlContext.read.parquet("people.parquet")
// Parquet檔案(多個)可以註冊為臨時表,然後在SQL語句中直接查詢
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
分割槽發現
像Hive這樣的系統,一個很常用的優化手段就是表分割槽。在一個支援分割槽的表中,資料是儲存在不同的目錄中的,並且將分割槽鍵以編碼方式儲存在各個分割槽目錄路徑中。Parquet資料來源現在也支援自動發現和推導分割槽資訊。例如,我們可以把之前用的人口資料存到一個分割槽表中,其目錄結構如下所示,其中有2個額外的欄位,gender和country,作為分割槽鍵:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
在這個例子中,如果需要讀取Parquet檔案資料,我們只需要把 path/to/table 作為引數傳給 SQLContext.read.parquet 或者 SQLContext.read.load。Spark SQL能夠自動的從路徑中提取出分割槽資訊,隨後返回的DataFrame的schema如下:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
注意,分割槽鍵的資料型別將是自動推匯出來的。目前,只支援數值型別和字串型別資料作為分割槽鍵。
有的使用者可能不想要自動推匯出來的分割槽鍵資料型別。這種情況下,你可以通過 spark.sql.sources.partitionColumnTypeInference.enabled (預設是true)來禁用分割槽鍵型別推導。禁用之後,分割槽鍵總是被當成字串型別。
從Spark-1.6.0開始,分割槽發現預設只在指定目錄的子目錄中進行。以上面的例子來說,如果使用者把 path/to/table/gender=male 作為引數傳給 SQLContext.read.parquet 或者 SQLContext.read.load,那麼gender就不會被作為分割槽鍵。如果使用者想要指定分割槽發現的基礎目錄,可以通過basePath選項指定。例如,如果把 path/to/table/gender=male作為資料目錄,並且將basePath設為 path/to/table,那麼gender仍然會最為分割槽鍵。
Schema合併
像ProtoBuffer、Avro和Thrift一樣,Parquet也支援schema演變。使用者從一個簡單的schema開始,逐漸增加所需的新欄位。這樣的話,使用者最終會得到多個schema不同但互相相容的Parquet檔案。目前,Parquet資料來源已經支援自動檢測這種情況,併合並所有檔案的schema。
因為schema合併相對代價比較大,並且在多數情況下不是必要的,所以從Spark-1.5.0之後,預設是被禁用的。你可以這樣啟用這一功能:
- 讀取Parquet檔案時,將選項mergeSchema設為true(見下面的示例程式碼)
- 或者,將全域性選項spark.sql.parquet.mergeSchema設為true
// 繼續沿用之前的sqlContext物件
// 為了支援RDD隱式轉換為DataFrame
import sqlContext.implicits._
// 建立一個簡單的DataFrame,存到一個分割槽目錄中
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// 建立另一個DataFrame放到新的分割槽目錄中,
// 並增加一個新欄位,丟棄一個老欄位
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// 讀取分割槽表
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// 最終的schema將由3個欄位組成(single,double,triple)
// 並且分割槽鍵出現在目錄路徑中
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
Hive metastore Parquet table轉換
在讀寫Hive metastore Parquet 表時,Spark SQL用的是內部的Parquet支援庫,而不是Hive SerDe,因為這樣效能更好。這一行為是由spark.sql.hive.convertMetastoreParquet 配置項來控制的,而且預設是啟用的。
Hive/Parquet schema調和
Hive和Parquet在表結構處理上主要有2個不同點:
- Hive大小寫敏感,而Parquet不是
- Hive所有欄位都是nullable的,而Parquet需要顯示設定
由於以上原因,我們必須在Hive metastore Parquet table轉Spark SQL Parquet table的時候,對Hive metastore schema做調整,調整規則如下:
- 兩種schema中欄位名和欄位型別必須一致(不考慮nullable)。調和後的欄位型別必須在Parquet格式中有相對應的資料型別,所以nullable是也是需要考慮的。
- 調和後Spark SQL Parquet table schema將包含以下欄位:
- 只出現在Parquet schema中的欄位將被丟棄
- 只出現在Hive metastore schema中的欄位將被新增進來,並顯式地設為nullable。
重新整理元資料
Spark SQL會快取Parquet元資料以提高效能。如果Hive metastore Parquet table轉換被啟用的話,那麼轉換過來的schema也會被快取。這時候,如果這些表由Hive或其他外部工具更新了,你必須手動重新整理元資料。
// 注意,這裡sqlContext是一個HiveContext
sqlContext.refreshTable("my_table")
配置
Parquet配置可以通過 SQLContext.setConf 或者 SQL語句中 SET key=value來指定。
屬性名 | 預設值 | 含義 |
---|---|---|
spark.sql.parquet.binaryAsString |
false | 有些老系統,如:特定版本的Impala,Hive,或者老版本的Spark SQL,不區分二進位制資料和字串型別資料。這個標誌的意思是,讓Spark SQL把二進位制資料當字串處理,以相容老系統。 |
spark.sql.parquet.int96AsTimestamp |
true | 有些老系統,如:特定版本的Impala,Hive,把時間戳存成INT96。這個配置的作用是,讓Spark SQL把這些INT96解釋為timestamp,以相容老系統。 |
spark.sql.parquet.cacheMetadata |
true | 快取Parquet schema元資料。可以提升查詢靜態資料的速度。 |
spark.sql.parquet.compression.codec |
gzip | 設定Parquet檔案的壓縮編碼格式。可接受的值有:uncompressed, snappy, gzip(預設), lzo |
spark.sql.parquet.filterPushdown |
true | 啟用過濾器下推優化,可以講過濾條件儘量推導最下層,已取得效能提升 |
spark.sql.hive.convertMetastoreParquet |
true | 如果禁用,Spark SQL將使用Hive SerDe,而不是內建的對Parquet tables的支援 |
spark.sql.parquet.output.committer.class |
org.apache.parquet.hadoop. |
Parquet使用的資料輸出類。這個類必須是 org.apache.hadoop.mapreduce.OutputCommitter的子類。一般來說,它也應該是 org.apache.parquet.hadoop.ParquetOutputCommitter的子類。注意:1. 如果啟用spark.speculation, 這個選項將被自動忽略
2. 這個選項必須用hadoop configuration設定,而不是Spark SQLConf 3. 這個選項會覆蓋 spark.sql.sources.outputCommitterClass Spark SQL有一個內建的org.apache.spark.sql.parquet.DirectParquetOutputCommitter, 這個類的在輸出到S3的時候比預設的ParquetOutputCommitter類效率高。 |
spark.sql.parquet.mergeSchema |
false |
如果設為true,那麼Parquet資料來源將會merge 所有資料檔案的schema,否則,schema是從summary file獲取的(如果summary file沒有設定,則隨機選一個) |
JSON資料集
Spark SQL在載入JSON資料的時候,可以自動推導其schema並返回DataFrame。用SQLContext.read.json讀取一個包含String的RDD或者JSON檔案,即可實現這一轉換。
注意,通常所說的json檔案只是包含一些json資料的檔案,而不是我們所需要的JSON格式檔案。JSON格式檔案必須每一行是一個獨立、完整的的JSON物件。因此,一個常規的多行json檔案經常會載入失敗。
// sc是已有的SparkContext物件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 資料集是由路徑指定的
// 路徑既可以是單個檔案,也可以還是儲存文字檔案的目錄
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
// 推匯出來的schema,可由printSchema打印出來
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// 將DataFrame註冊為table
people.registerTempTable("people")
// 跑SQL語句吧!
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 另一種方法是,用一個包含JSON字串的RDD來建立DataFrame
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Hive表
Spark SQL支援從Apache Hive讀寫資料。然而,Hive依賴項太多,所以沒有把Hive包含在預設的Spark釋出包裡。要支援Hive,需要在編譯spark的時候增加-Phive和-Phive-thriftserver標誌。這樣編譯打包的時候將會把Hive也包含進來。注意,hive的jar包也必須出現在所有的worker節點上,訪問Hive資料時候會用到(如:使用hive的序列化和反序列化SerDes時)。
Hive配置在conf/目錄下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)檔案中。請注意,如果在YARN cluster(yarn-cluster mode)模式下執行一個查詢的話,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必須在驅動器(driver)和所有執行器(executor)都可用。一種簡便的方法是,通過spark-submit命令的–jars和–file選項來提交這些檔案。
如果使用Hive,則必須構建一個HiveContext,HiveContext是派生於SQLContext的,添加了在Hive Metastore裡查詢表的支援,以及對HiveQL的支援。使用者沒有現有的Hive部署,也可以建立一個HiveContext。如果沒有在hive-site.xml裡配置,那麼HiveContext將會自動在當前目錄下建立一個metastore_db目錄,再根據HiveConf設定建立一個warehouse目錄(預設/user/hive/warehourse)。所以請注意,你必須把/user/hive/warehouse的寫許可權賦予啟動spark應用程式的使用者。
// sc是一個已有的SparkContext物件
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// 這裡用的是HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
和不同版本的Hive Metastore互動
Spark SQL對Hive最重要的支援之一就是和Hive metastore進行互動,這使得Spark SQL可以訪問Hive表的元資料。從Spark-1.4.0開始,Spark SQL有專門單獨的二進位制build版本,可以用來訪問不同版本的Hive metastore,其配置表如下。注意,不管所訪問的hive是什麼版本,Spark SQL內部都是以Hive 1.2.1編譯的,而且內部使用的Hive類也是基於這個版本(serdes,UDFs,UDAFs等)
以下選項可用來配置Hive版本以便訪問其元資料:
屬性名 | 預設值 | 含義 |
---|---|---|
spark.sql.hive.metastore.version |
1.2.1 |
Hive metastore版本,可選的值為0.12.0 到 1.2.1 |
spark.sql.hive.metastore.jars |
builtin |
初始化HiveMetastoreClient的jar包。這個屬性可以是以下三者之一:
目前內建為使用Hive-1.2.1,編譯的時候啟用-Phive,則會和spark一起打包。如果沒有-Phive,那麼spark.sql.hive.metastore.version要麼是1.2.1,要就是未定義
使用maven倉庫下載的jar包版本。這個選項建議不要再生產環境中使用
|
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
一個逗號分隔的類名字首列表,這些類使用classloader載入,且可以在Spark SQL和特定版本的Hive間共享。例如,用來訪問hive metastore 的JDBC的driver就需要這種共享。其他需要共享的類,是與某些已經共享的類有互動的類。例如,自定義的log4j appender |
spark.sql.hive.metastore.barrierPrefixes |
(empty) |
一個逗號分隔的類名字首列表,這些類在每個Spark SQL所訪問的Hive版本中都會被顯式的reload。例如,某些在共享字首列表(spark.sql.hive.metastore.sharedPrefixes)中宣告為共享的Hive UD函式 |
用JDBC連線其他資料庫
Spark SQL也可以用JDBC訪問其他資料庫。這一功能應該優先於使用JdbcRDD。因為它返回一個DataFrame,而DataFrame在Spark SQL中操作更簡單,且更容易和來自其他資料來源的資料進行互動關聯。JDBC資料來源在java和python中用起來也很簡單,不需要使用者提供額外的ClassTag。(注意,這與Spark SQL JDBC server不同,Spark SQL JDBC server允許其他應用執行Spark SQL查詢)
首先,你需要在spark classpath中包含對應資料庫的JDBC driver,下面這行包括了用於訪問postgres的資料庫driver
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
遠端資料庫的表可以通過Data Sources API,用DataFrame或者SparkSQL 臨時表來裝載。以下是選項列表:
屬性名 | 含義 |
---|---|
url |
需要連線的JDBC URL |
dbtable |
需要讀取的JDBC表。注意,任何可以填在SQL的where子句中的東西,都可以填在這裡。(既可以填完整的表名,也可填括號括起來的子查詢語句) |
driver |
JDBC driver的類名。這個類必須在master和worker節點上都可用,這樣各個節點才能將driver註冊到JDBC的子系統中。 |
partitionColumn, lowerBound, upperBound, numPartitions |
這幾個選項,如果指定其中一個,則必須全部指定。他們描述了多個worker如何並行的讀入資料,並將表分割槽。partitionColumn必須是所查詢的表中的一個數值欄位。注意,lowerBound和upperBound只是用於決定分割槽跨度的,而不是過濾表中的行。因此,表中所有的行都會被分割槽然後返回。 |
fetchSize |
JDBC fetch size,決定每次獲取多少行資料。在JDBC驅動上設成較小的值有利於效能優化(如,Oracle上設為10) |
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:postgresql:dbserver",
"dbtable" -> "schema.tablename")).load()
疑難解答
- JDBC driver class必須在所有client session或者executor上,對java的原生classloader可見。這是因為Java的DriverManager在開啟一個連線之前,會做安全檢查,並忽略所有對原聲classloader不可見的driver。最簡單的一種方法,就是在所有worker節點上修改compute_classpath.sh,幷包含你所需的driver jar包。
- 一些資料庫,如H2,會把所有的名字轉大寫。對於這些資料庫,在Spark SQL中必須也使用大寫。
效能調整
對於有一定計算量的Spark作業來說,可能的效能改進的方式,不是把資料快取在記憶體裡,就是調整一些開銷較大的選項引數。
記憶體快取
Spark SQL可以通過呼叫SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列儲存格式快取到記憶體中。隨後,Spark SQL將會掃描必要的列,並自動調整壓縮比例,以減少記憶體佔用和GC壓力。你也可以用SQLContext.uncacheTable(“tableName”)來刪除記憶體中的table。
你還可以使用SQLContext.setConf 或在SQL語句中執行SET key=value命令,來配置記憶體中的快取。
屬性名 | 預設值 | 含義 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 如果設定為true,Spark SQL將會根據資料統計資訊,自動為每一列選擇單獨的壓縮編碼方式。 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制列式快取批量的大小。增大批量大小可以提高記憶體利用率和壓縮率,但同時也會帶來OOM(Out Of Memory)的風險。 |
其他配置選項
以下選項同樣也可以用來給查詢任務調效能。不過這些選項在未來可能被放棄,因為spark將支援越來越多的自動優化。
屬性名 | 預設值 | 含義 |
---|---|---|
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) | 配置join操作時,能夠作為廣播變數的最大table的大小。設定為-1,表示禁用廣播。注意,目前的元資料統計僅支援Hive metastore中的表,並且需要執行這個命令:ANALYSE TABLE <tableName> COMPUTE STATISTICS noscan |
spark.sql.tungsten.enabled |
true | 設為true,則啟用優化的Tungsten物理執行後端。Tungsten會顯式的管理記憶體,並動態生成表示式求值的位元組碼 |
spark.sql.shuffle.partitions |
200 | 配置資料混洗(shuffle)時(join或者聚合操作),使用的分割槽數。 |
分散式SQL引擎
Spark SQL可以作為JDBC/ODBC或者命令列工具的分散式查詢引擎。在這種模式下,終端使用者或應用程式,無需寫任何程式碼,就可以直接在Spark SQL中執行SQL查詢。
執行Thrift JDBC/ODBC server
這裡實現的Thrift JDBC/ODBC server和Hive-1.2.1中的HiveServer2
是相同的。你可以使用beeline指令碼來測試Spark或者Hive-1.2.1的JDBC server。
在Spark目錄下執行下面這個命令,啟動一個JDBC/ODBC server
./sbin/start-thriftserver.sh
這個指令碼能接受所有 bin/spark-submit 命令支援的選項引數,外加一個 –hiveconf 選項,來指定Hive屬性。執行./sbin/start-thriftserver.sh –help可以檢視完整的選項列表。預設情況下,啟動的server將會在localhost:10000埠上監聽。要改變監聽主機名或埠,可以用以下環境變數:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \
...
或者Hive系統屬性 來指定
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
...
接下來,你就可以開始在beeline中測試這個Thrift JDBC/ODBC server:
./bin/beeline
下面的指令,可以連線到一個JDBC/ODBC server
beeline> !connect jdbc:hive2://localhost:10000
可能需要輸入使用者名稱和密碼。在非安全模式下,只要輸入你本機的使用者名稱和一個空密碼即可。對於安全模式,請參考beeline documentation.
Hive的配置是在conf/目錄下的hive-site.xml,core-site.xml,hdfs-site.xml中指定的。
你也可以在beeline的指令碼中指定。
Thrift JDBC server也支援通過HTTP傳輸Thrift RPC訊息。以下配置(在conf/hive-site.xml中)將啟用HTTP模式:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
同樣,在beeline中也可以用HTTP模式連線JDBC/ODBC server:
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
使用Spark SQL命令列工具
Spark SQL CLI是一個很方便的工具,它可以用local mode執行hive metastore service,並且在命令列中執行輸入的查詢。注意Spark SQL CLI目前還不支援和Thrift JDBC server通訊。
用如下命令,在spark目錄下啟動一個Spark SQL CLI
./bin/spark-sql
Hive配置在conf目錄下hive-site.xml,core-site.xml,hdfs-site.xml中設定。你可以用這個命令檢視完整的選項列表:./bin/spark-sql –help
升級指南
1.5升級到1.6
- 從Spark-1.6.0起,預設Thrift server 將運行於多會話並存模式下(multi-session)。這意味著,每個JDBC/ODBC連線有其獨立的SQL配置和臨時函式登錄檔。table的快取仍然是公用的。如果你更喜歡老的單會話模式,只需設定spark.sql.hive.thriftServer.singleSession為true即可。當然,你也可在spark-defaults.conf中設定,或者將其值傳給start-thriftserver.sh –conf(如下):
./sbin/start-thriftserver.sh \
--conf spark.sql.hive.thriftServer.singleSession=true \
...
1.4升級到1.5
- Tungsten引擎現在預設是啟用的,Tungsten是通過手動管理記憶體優化執行計劃,同時也優化了表示式求值的程式碼生成。這兩個特性都可以通過把spark.sql.tungsten.enabled設為false來禁用。
- Parquet schema merging預設不啟用。需要啟用的話,設定spark.sql.parquet.mergeSchema為true即可
- Python介面支援用點(.)來訪問欄位內嵌值,例如df[‘table.column.nestedField’]。但這也意味著,如果你的欄位名包含點號(.)的話,你就必須用重音符來轉義,如:table.`column.with.dots`.nested。
- 列式儲存記憶體分割槽剪枝預設是啟用的。要禁用,設定spark.sql.inMemoryColumarStorage.partitionPruning為false即可
- 不再支援無精度限制的decimal。Spark SQL現在強制最大精度為38位。對於BigDecimal物件,型別推導將會使用(38,18)精度的decimal型別。如果DDL中沒有指明精度,預設使用的精度是(10,0)
- 時間戳精確到1us(微秒),而不是1ns(納秒)
- 在“sql”這個SQL變種設定中,浮點數將被解析為decimal。HiveQL解析保持不變。
- 標準SQL/DataFrame函式均為小寫,例如:sum vs SUM。
- 當推測任務被啟用是,使用DirectOutputCommitter是不安全的,因此,DirectOutputCommitter在推測任務啟用時,將被自動禁用,且忽略相關配置。
- JSON資料來源不再自動載入其他程式產生的新檔案(例如,不是Spark SQL插入到dataset中的檔案)。對於一個JSON的持久化表(如:Hive metastore中儲存的表),使用者可以使用REFRESH TABLE這個SQL命令或者HiveContext.refreshTable來把新檔案包括進來。
1.3升級到1.4
DataFrame資料讀寫介面
根據使用者的反饋,我們提供了一個新的,更加流暢的API,用於資料讀(SQLContext.read)寫(DataFrame.write),同時老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)將被廢棄。
有關SQLContext.read和DataFrame.write的更詳細資訊,請參考API文件。
DataFrame.groupBy保留分組欄位
根據使用者的反饋,我們改變了DataFrame.groupBy().agg()的預設行為,在返回的DataFrame結果中保留了分組欄位。如果你想保持1.3中的行為,設定spark.sql.retainGroupColumns為false即可。
// 在1.3.x中,如果要保留分組欄位"department", 你必須顯式的在agg聚合時包含這個欄位
df.groupBy("department").agg($"department", max("age"), sum("expense"))
// 而在1.4+,分組欄位"department"預設就會包含在返回的DataFrame中
df.groupBy("department").agg(max("age"), sum("expense"))
// 要回滾到1.3的行為(不包含分組欄位),按如下設定即可:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
1.2升級到1.3
在Spark 1.3中,我們去掉了Spark SQL的”Alpha“標籤,並清理了可用的API。從Spark 1.3起,Spark SQL將對1.x系列二進位制相容。這個相容性保證不包括顯式的標註為”unstable(如:DeveloperAPI或Experimental)“的API。
SchemaRDD重新命名為DataFrame
對於使用者來說,Spark SQL 1.3最大的改動就是SchemaRDD改名為DataFrame。主要原因是,DataFrame不再直接由RDD派生,而是通過自己的實現提供RDD的功能。DataFrame只需要呼叫其rdd方法就能轉成RDD。
在Scala中仍然有SchemaRDD,只不過這是DataFrame的一個別名,以便相容一些現有程式碼。但仍然建議使用者改用DataFrame。Java和Python使用者就沒這個福利了,他們必須改程式碼。
統一Java和Scala API
在Spark 1.3之前,有單獨的java相容類(JavaSQLContext和JavaSchemaRDD)及其在Scala API中的映象。Spark 1.3中將Java API和Scala API統一。兩種語言的使用者都應該使用SQLContext和DataFrame。一般這些類中都會使用兩種語言中都有的型別(如:Array取代各語言獨有的集合)。有些情況下,沒有通用的型別(例如:閉包或者maps),將會使用函式過載來解決這個問題。
另外,java特有的型別API被刪除了。Scala和java使用者都應該用org.apache.spark.sql.types來程式設計描述一個schema。
隱式轉換隔離,DSL包移除 – 僅針對scala
Spark 1.3之前的很多示例程式碼,都在開頭用 import sqlContext._,這行將會導致所有的sqlContext的函式都被引入進來。因此,在Spark 1.3我們把RDDs到DataFrames的隱式轉換隔離出來,單獨放到SQLContext.implicits物件中。使用者現在應該這樣寫:import sqlContext.implicits._
另外,隱式轉換也支援由Product(如:case classes或tuples)組成的RDD,但需要呼叫一個toDF方法,而不是自動轉換。
如果需要使用DSL(被DataFrame取代的API)中的方法,使用者之前需要匯入DSL(import org.apache.spark.sql.catalyst.dsl), 而現在應該要匯入 DataFrame API(import org.apache.spark.sql.functions._)
移除org.apache.spark.sql中DataType別名 – 僅針對scala
Spark 1.3刪除了sql包中的DataType類型別名。現在,使用者應該使用 org.apache.spark.sql.types中的類。
UDF註冊挪到sqlContext.udf中 – 針對java和scala
註冊UDF的函式,不管是DataFrame,DSL或者SQL中用到的,都被挪到SQLContext.udf中。
sqlContext.udf.register("strLen", (s: String) => s.length())
Python UDF註冊保持不變。
Python DataTypes不再是單例
在python中使用DataTypes,你需要先構造一個物件(如:StringType()),而不是引用一個單例。
Shark使用者遷移指南
排程
使用者可以通過如下命令,為JDBC客戶端session設定一個Fair Scheduler pool。
SET spark.sql.thriftserver.scheduler.pool=accounting;
Reducer個數
在Shark中,預設的reducer個數是1,並且由mapred.reduce.tasks設定。Spark SQL廢棄了這個屬性,改為 spark.sql.shuffle.partitions, 並且預設200,使用者可通過如下SET命令來自定義:
SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c
FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;
你也可以把這個屬性放到hive-site.xml中來覆蓋預設值。
目前,mapred.reduce.tasks屬性仍然能被識別,並且自動轉成spark.sql.shuffle.partitions
快取
shark.cache表屬性已經不存在了,並且以”_cached”結尾命名的表也不再會自動快取。取而代之的是,CACHE TABLE和UNCACHE TABLE語句,用以顯式的控制表的快取:
CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;
注意:CACHE TABLE tbl 現在預設是飢餓模式,而非懶惰模式。再也不需要手動呼叫其他action來觸發cache了!
從Spark-1.2.0開始,Spark SQL新提供了一個語句,讓使用者自己控制表快取是否是懶惰模式
CACHE [LAZY] TABLE [AS SELECT] ...
以下幾個快取相關的特性不再支援:
- 使用者定義分割槽級別的快取逐出策略
- RDD 重載入
- 記憶體快取直接寫入策略
相容Apache Hive
部署在已有的Hive倉庫之上
Spark SQL Thrift JDBC server採用了”out of the box”(開箱即用)的設計,使用很方便,併兼容已有的Hive安裝版本。你不需要修改已有的Hive metastore或者改變資料的位置,或者表分割槽。
支援的Hive功能
Spark SQL 支援絕大部分Hive功能,如:
- Hive查詢語句:
SELECT
GROUP BY
ORDER BY
CLUSTER BY
SORT BY
- 所有的Hive操作符:
- Relational operators (
=
,⇔
,==
,<>
,<
,>
,>=
,<=
, etc) - Arithmetic operators (
+
,-
,*
,/
,%
, etc) - Logical operators (
AND
,&&
,OR
,||
, etc) - Complex type constructors
- Mathematical functions (
sign
,ln
,cos
, etc) - String functions (
instr
,length
,printf
, etc)
- Relational operators (
- 使用者定義函式(UDF)
- 使用者定義聚合函式(UDAF)
- 使用者定義序列化、反序列化(SerDes)
- 視窗函式(Window functions)
- Joins
JOIN
{LEFT|RIGHT|FULL} OUTER JOIN
LEFT SEMI JOIN
CROSS JOIN
- Unions
- 查詢子句
SELECT col FROM ( SELECT a + b AS col from t1) t2
- 取樣
- 執行計劃詳細(Explain)
- 分割槽表,包括動態分割槽插入
- 檢視
- 所有Hive DDL(data definition language):
CREATE TABLE
CREATE TABLE AS SELECT
ALTER TABLE
- 絕大部分Hive資料型別:
TINYINT
SMALLINT
INT
BIGINT
BOOLEAN
FLOAT
DOUBLE
STRING
BINARY
TIMESTAMP
DATE
ARRAY<>
MAP<>
STRUCT<>
不支援的Hive功能
以下是目前不支援的Hive特性的列表。多數是不常用的。
不支援的Hive常見功能
- bucket表:butcket是Hive表的一個雜湊分割槽
不支援的Hive高階功能
- UNION類操作
- 去重join
- 欄位統計資訊收集:Spark SQL不支援同步的欄位統計收集
Hive輸入、輸出格式
- CLI檔案格式:對於需要回顯到CLI中的結果,Spark SQL僅支援TextOutputFormat。
- Hadoop archive — Hadoop歸檔
Hive優化
一些比較棘手的Hive優化目前還沒有在Spark中提供。有一些(如索引)對應Spark SQL這種記憶體計算模型來說並不重要。另外一些,在Spark SQL未來的版本中會支援。
- 塊級別點陣圖索引和虛擬欄位(用來建索引)
- 自動計算reducer個數(join和groupBy運算元):目前在Spark SQL中你需要這樣控制混洗後(post-shuffle)併發程度:”SET spark.sql.shuffle.partitions=[num_tasks];”
- 元資料查詢:只查詢元資料的請求,Spark SQL仍需要啟動任務來計算結果
- 資料傾斜標誌:Spark SQL不會理會Hive中的資料傾斜標誌
STREAMTABLE
join提示:Spark SQL裡沒有這玩藝兒- 返回結果時合併小檔案:如果返回的結果有很多小檔案,Hive有個選項設定,來合併小檔案,以避免超過HDFS的檔案數額度限制。Spark SQL不支援這個。
參考
資料型別
Spark SQL和DataFrames支援如下資料型別:
- Numeric types(數值型別)
ByteType
: 1位元組長的有符號整型,範圍:-128
到127
.ShortType
: 2位元組長有符號整型,範圍:-32768
到32767
.IntegerType
: 4位元組有符號整型,範圍:-2147483648
到2147483647
.LongType
: 8位元組有符號整型,範圍:-9223372036854775808
to9223372036854775807
.FloatType
: 4位元組單精度浮點數。DoubleType
: 8位元組雙精度浮點數DecimalType
: 任意精度有符號帶小數的數值。內部使用java.math.BigDecimal, BigDecimal包含任意精度的不縮放整型,和一個32位的縮放整型
- String type(字串型別)
StringType
: 字串
- Binary type(二進位制型別)
BinaryType
: 位元組序列
- Boolean type(布林型別)
BooleanType
: 布林型別
- Datetime type(日期型別)
TimestampType
: 表示包含年月日、時分秒等欄位的日期DateType
: 表示包含年月日欄位的日期
- Complex types(複雜型別)
ArrayType(elementType, containsNull)
:陣列型別,表達一系列的elementType型別的元素組成的序列,containsNull表示陣列能否包含null值MapType(keyType, valueType, valueContainsNull)
:對映集合型別,表示一個鍵值對的集合。鍵的型別是keyType,值的型別則由valueType指定。對應MapType來說,鍵是不能為null的,而值能否為null則取決於valueContainsNull。StructType(fields):
表示包含StructField序列的結構體。- StructField(name, datatype, nullable): 表示StructType中的一個欄位,name是欄位名,datatype是資料型別,nullable表示該欄位是否可以為空
所有Spark SQL支援的資料型別都在這個包裡:org.apache.spark.sql.types,你可以這樣匯入之:
import org.apache.spark.sql.types._
Data type | Value type in Scala | API to access or create a data type |
---|---|---|
ByteType | Byte | ByteType |
ShortType | Short | ShortType |