1. 程式人生 > >spark 基礎知識- spark SQL專題

spark 基礎知識- spark SQL專題

一、簡介

  Spark SQL是Spark中處理結構化資料的模組。與基礎的Spark RDD API不同,Spark SQL的介面提供了更多關於資料的結構資訊和計算任務的執行時資訊。在Spark內部,Spark SQL會能夠用於做優化的資訊比RDD API更多一些。Spark SQL如今有了三種不同的API:SQL語句、DataFrame API和最新的Dataset API。不過真正執行計算的時候,無論你使用哪種API或語言,Spark SQL使用的執行引擎都是同一個。這種底層的統一,使開發者可以在不同的API之間來回切換,你可以選擇一種最自然的方式,來表達你的需求。
(本文針對spark1.6版本,示例語言為Scala)
二、概念

1. SQL。Spark SQL的一種用法是直接執行SQL查詢語句,你可使用最基本的SQL語法,也可以選擇HiveQL語法。Spark SQL可以從已有的Hive中讀取資料。更詳細的請參考Hive Tables 這一節。如果用其他程式語言執行SQL,Spark SQL將以DataFrame返回結果。你還可以通過命令列command-line 或者 JDBC/ODBC 使用Spark SQL。

2. DataFrame。是一種分散式資料集合,每一條資料都由幾個命名欄位組成。概念上來說,她和關係型資料庫的表 或者 R和Python中的data frame等價,只不過在底層,DataFrame採用了更多優化。DataFrame可以從很多資料來源(sources)載入資料並構造得到,如:結構化資料檔案,Hive中的表,外部資料庫,或者已有的RDD。
DataFrame API支援Scala, Java, Python, and R。


3. Datasets。是Spark-1.6新增的一種API,目前還是實驗性的。Dataset想要把RDD的優勢(強型別,可以使用lambda表示式函式)和Spark SQL的優化執行引擎的優勢結合到一起。Dataset可以由JVM物件構建(constructed )得到,而後Dataset上可以使用各種transformation運算元(map,flatMap,filter 等)。
Dataset API 對 Scala 和 Java的支援介面是一致的,但目前還不支援Python,不過Python自身就有語言動態特性優勢(例如,你可以使用欄位名來訪問資料,row.columnName)。對Python的完整支援在未來的版本會增加進來。
三、建立並操作DataFrame

  Spark應用可以用SparkContext建立DataFrame,所需的資料來源可以是已有的RDD(existing RDD ),或者Hive表,或者其他資料來源(data sources.)以下是一個從JSON檔案建立並操作DataFrame的小例子:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 將DataFrame內容列印到stdout
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// 列印資料樹形結構
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// select "name" 欄位
df.select("name").show()
// name
// Michael
// Andy
// Justin

// 展示所有人,但所有人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// 篩選出年齡大於21的人
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// 計算各個年齡的人數
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

SQLContext.sql可以執行一個SQL查詢,並返回DataFrame結果。

val sqlContext = ... // 已有一個 SQLContext 物件
val df = sqlContext.sql("SELECT * FROM table")

三、spark SQL與RDD互操作

  Spark SQL有兩種方法將RDD轉為DataFrame。分別為反射機制和程式設計方式。
1. 利用反射推導schema。

  Spark SQL的Scala介面支援自動將包含case class物件的RDD轉為DataFrame。對應的case class定義了表的schema。case class的引數名通過反射,對映為表的欄位名。case class還可以巢狀一些複雜型別,如Seq和Array。RDD隱式轉換成DataFrame後,可以進一步註冊成表。隨後,你就可以對錶中資料使用 SQL語句查詢了。

// sc 是已有的 SparkContext 物件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 為了支援RDD到DataFrame的隱式轉換
import sqlContext.implicits._

// 定義一個case class.
// 注意:Scala 2.10的case class最多支援22個欄位,要繞過這一限制,Scala-2.11已經解決這個問題

// 你可以使用自定義class,並實現Product介面。當然,你也可以改用程式設計方式定義schema

https://stackoverflow.com/questions/20258417/how-to-get-around-the-scala-case-class-limit-of-22-fields

classDemo(val field1:String,val field2:Int,// .. and so on ..val field23:String)extendsProduct//For Spark it has to be SerializablewithSerializable{def canEqual
(that:Any)= that.isInstanceOf[Demo]def productArity =23// number of columnsdef productElement(idx:Int)= idx match{case0=> field1 case1=> field2 // .. and so on ..case22=> field23 }}

case class Person(name: String, age: Int)

// 建立一個包含Person物件的RDD,並將其註冊成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法可以直接執行SQL語句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查詢的返回結果是一個DataFrame,且能夠支援所有常見的RDD運算元
// 查詢結果中每行的欄位可以按欄位索引訪問:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按欄位名訪問:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 會一次性返回多列,並以Map[String, T]為返回結果型別
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回結果: Map("name" -> "Justin", "age" -> 19)

2. 程式設計方式定義Schema。

  如果不能事先通過case class定義schema(例如,記錄的欄位結構是儲存在一個字串,或者其他文字資料集中,需要先解析,又或者欄位對不同使用者有所不同),那麼你可能需要按以下三個步驟,以程式設計方式的建立一個DataFrame:

  從已有的RDD建立一個包含Row物件的RDD,用StructType建立一個schema,和步驟1中建立的RDD的結構相匹配,把得到的schema應用於包含Row物件的RDD,呼叫這個方法來實現這一步:SQLContext.createDataFrame
例如:

// sc 是已有的SparkContext物件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 建立一個RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// 資料的schema被編碼與一個字串中
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL 各個資料型別
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// 基於前面的字串生成schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 將RDD[people]的各個記錄轉換為Rows,即:得到一個包含Row物件的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 將schema應用到包含Row物件的RDD上,得到一個DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// 將DataFrame註冊為table
peopleDataFrame.registerTempTable("people")

// 執行SQL語句
val results = sqlContext.sql("SELECT name FROM people")

// SQL查詢的結果是DataFrame,且能夠支援所有常見的RDD運算元
// 並且其欄位可以以索引訪問,也可以用欄位名訪問
results.map(t => "Name: " + t(0)).collect().foreach(println)

四、spark SQL與其它資料來源的連線與操作

  Spark SQL支援基於DataFrame操作一系列不同的資料來源。DataFrame既可以當成一個普通RDD來操作,也可以將其註冊成一個臨時表來查詢。把 DataFrame註冊為table之後,你就可以基於這個table執行SQL語句了。本節將描述載入和儲存資料的一些通用方法,包含了不同的 Spark資料來源,然後深入介紹一下內建資料來源可用選項。
  在最簡單的情況下,所有操作都會以預設型別資料來源來載入資料(預設是Parquet,除非修改了spark.sql.sources.default 配置)。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

  你也可以手動指定資料來源,並設定一些額外的選項引數。資料來源可由其全名指定(如,org.apache.spark.sql.parquet),而 對於內建支援的資料來源,可以使用簡寫名(json, parquet, jdbc)。任意型別資料來源建立的DataFrame都可以用下面這種語法轉成其他型別資料格式。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

  Spark SQL還支援直接對檔案使用SQL查詢,不需要用read方法把檔案載入進來。

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

1. 連線JSON資料集

  Spark SQL在載入JSON資料的時候,可以自動推導其schema並返回DataFrame
。用SQLContext.read.json讀取一個包含String的RDD或者JSON檔案,即可實現這一轉換。

注意,通常所說的json檔案只是包含一些json資料的檔案,而不是我們所需要的JSON格式檔案。JSON格式檔案必須每一行是一個獨立、完整的的JSON物件。因此,一個常規的多行json檔案經常會載入失敗。

// sc是已有的SparkContext物件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 資料集是由路徑指定的
// 路徑既可以是單個檔案,也可以還是儲存文字檔案的目錄
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// 推匯出來的schema,可由printSchema打印出來
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// 將DataFrame註冊為table
people.registerTempTable("people")

// 跑SQL語句吧!
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 另一種方法是,用一個包含JSON字串的RDD來建立DataFrame
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

2. 連線Hive表

  Spark SQL支援從Apache Hive讀 寫資料。然而,Hive依賴項太多,所以沒有把Hive包含在預設的Spark釋出包裡。要支援Hive,需要在編譯spark的時候增加-Phive和 -Phive-thriftserver標誌。這樣編譯打包的時候將會把Hive也包含進來。注意,hive的jar包也必須出現在所有的worker節 點上,訪問Hive資料時候會用到(如:使用hive的序列化和反序列化SerDes時)。
  Hive配置在conf/目錄下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配 置)檔案中。請注意,如果在YARN cluster(yarn-cluster mode)模式下執行一個查詢的話,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必須在驅動器(driver)和所有執行器(executor)都可用。一種簡便的方法是,通過 spark-submit命令的–jars和–file選項來提交這些檔案。
  如果使用Hive,則必須構建一個HiveContext,HiveContext是派生於SQLContext的,添加了在Hive Metastore裡查詢表的支援,以及對HiveQL的支援。使用者沒有現有的Hive部署,也可以建立一個HiveContext。如果沒有在 hive-site.xml裡配置,那麼HiveContext將會自動在當前目錄下建立一個metastore_db目錄,再根據HiveConf設定 建立一個warehouse目錄(預設/user/hive/warehourse)。所以請注意,你必須把/user/hive/warehouse的 寫許可權賦予啟動spark應用程式的使用者。

// sc是一個已有的SparkContext物件
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 這裡用的是HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

3. 用JDBC連線其他資料庫

  Spark SQL也可以用JDBC訪問其他資料庫。這一功能應該優先於使用JdbcRDD。因為它返回一個DataFrame,而DataFrame在Spark SQL中操作更簡單,且更容易和來自其他資料來源的資料進行互動關聯。JDBC資料來源在java和python中用起來也很簡單,不需要使用者提供額外的 ClassTag。(注意,這與Spark SQL JDBC server不同,Spark SQL JDBC server允許其他應用執行Spark SQL查詢)
  首先,你需要在spark classpath中包含對應資料庫的JDBC driver,下面這行包括了用於訪問postgres的資料庫driver
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

注意:

    JDBC driver class必須在所有client session或者executor上,對java的原生classloader可見。這是因為Java的DriverManager在開啟一個連線之 前,會做安全檢查,並忽略所有對原聲classloader不可見的driver。最簡單的一種方法,就是在所有worker節點上修改 compute_classpath.sh,幷包含你所需的driver jar包。
    一些資料庫,如H2,會把所有的名字轉大寫。對於這些資料庫,在Spark SQL中必須也使用大寫。