1. 程式人生 > 其它 >【Spark研究】用Apache Spark進行大資料處理第二部分:Spark SQL

【Spark研究】用Apache Spark進行大資料處理第二部分:Spark SQL

在Apache Spark文章系列的前一篇文章中,我們學習了什麼是Apache Spark框架,以及如何用該框架幫助組織處理大資料處理分析的需求。

Spark SQL,作為Apache Spark大資料框架的一部分,主要用於結構化資料處理和對Spark資料執行類SQL的查詢。通過Spark SQL,可以針對不同格式的資料執行ETL操作(如JSON,Parquet,資料庫)然後完成特定的查詢操作。

在這一文章系列的第二篇中,我們將討論Spark SQL庫,如何使用Spark SQL庫對儲存在批處理檔案、JSON資料集或Hive表中的資料執行SQL查詢。

Spark大資料處理框架目前最新的版本是上個月釋出的Spark 1.3。這一版本之前,Spark SQL模組一直處於“Alpha”狀態,現在該團隊已經從Spark SQL庫上將這一標籤移除。這一版本中包含了許多新的功能特性,其中一部分如下:

  • 資料框架(DataFrame):Spark新版本中提供了可以作為分散式SQL查詢引擎的程式化抽象DataFrame。
  • 資料來源(Data Sources):隨著資料來源API的增加,Spark SQL可以便捷地處理以多種不同格式儲存的結構化資料,如Parquet,JSON以及Apache Avro庫。
  • JDBC伺服器(JDBC Server):內建的JDBC伺服器可以便捷地連線到儲存在關係型資料庫表中的結構化資料並利用傳統的商業智慧(BI)工具進行大資料分析。

Spark SQL元件

使用Spark SQL時,最主要的兩個元件就是DataFrame和SQLContext。

首先,我們來了解一下DataFrame。

DataFrame

DataFrame是一個分散式的,按照命名列的形式組織的資料集合。DataFrame基於R語言中的data frame概念,與關係型資料庫中的資料庫表類似。

之前版本的Spark SQL API中的SchemaRDD已經更名為DataFrame。

通過呼叫將DataFrame的內容作為行RDD(RDD of Rows)返回的rdd方法,可以將DataFrame轉換成RDD。

可以通過如下資料來源建立DataFrame:

  • 已有的RDD
  • 結構化資料檔案
  • JSON資料集
  • Hive表
  • 外部資料庫

Spark SQL和DataFrame API已經在下述幾種程式設計語言中實現:

  • Scala(https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.package)
  • Java(https://spark.apache.org/docs/1.3.0/api/java/index.html?org/apache/spark/sql/api/java/package-summary.html)
  • Python(https://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html)

本文中所涉及的Spark SQL程式碼示例均使用Spark Scala Shell程式。

SQLContext

Spark SQL提供SQLContext封裝Spark中的所有關係型功能。可以用之前的示例中的現有SparkContext建立SQLContext。下述程式碼片段展示瞭如何建立一個SQLContext物件。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

此外,Spark SQL中的HiveContext可以提供SQLContext所提供功能的超集。可以在用HiveQL解析器編寫查詢語句以及從Hive表中讀取資料時使用。

在Spark程式中使用HiveContext無需既有的Hive環境。

JDBC資料來源

Spark SQL庫的其他功能還包括資料來源,如JDBC資料來源。

JDBC資料來源可用於通過JDBC API讀取關係型資料庫中的資料。相比於使用JdbcRDD,應該將JDBC資料來源的方式作為首選,因為JDBC資料來源能夠將結果作為DataFrame物件返回,直接用Spark SQL處理或與其他資料來源連線。

Spark SQL示例應用

在上一篇文章中,我們學習瞭如何在本地環境中安裝Spark框架,如何啟動Spark框架並用Spark Scala Shell與其互動。如需安裝最新版本的Spark,可以從Spark網站下載該軟體。

對於本文中的程式碼示例,我們將使用相同的Spark Shell執行Spark SQL程式。這些程式碼示例適用於Windows環境。

為了確保Spark Shell程式有足夠的記憶體,可以在執行spark-shell命令時,加入driver-memory命令列引數,如下所示:

spark-shell.cmd --driver-memory 1G

Spark SQL應用

Spark Shell啟動後,就可以用Spark SQL API執行資料分析查詢。

在第一個示例中,我們將從文字檔案中載入使用者資料並從資料集中建立一個DataFrame物件。然後執行DataFrame函式,執行特定的資料選擇查詢。

文字檔案customers.txt中的內容如下:

100, John Smith, Austin, TX, 78727
200, Joe Johnson, Dallas, TX, 75201
300, Bob Jones, Houston, TX, 77028
400, Andy Davis, San Antonio, TX, 78227
500, James Williams, Austin, TX, 78727

下述程式碼片段展示了可以在Spark Shell終端執行的Spark SQL命令。

// 首先用已有的Spark Context物件建立SQLContext物件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 匯入語句,可以隱式地將RDD轉化成DataFrame
import sqlContext.implicits._

// 建立一個表示客戶的自定義類
case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String)

// 用資料集文字檔案建立一個Customer物件的DataFrame
val dfCustomers = sc.textFile("data/customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF()

// 將DataFrame註冊為一個表
dfCustomers.registerTempTable("customers")

// 顯示DataFrame的內容
dfCustomers.show()

// 列印DF模式
dfCustomers.printSchema()

// 選擇客戶名稱列
dfCustomers.select("name").show()

// 選擇客戶名稱和城市列
dfCustomers.select("name", "city").show()

// 根據id選擇客戶
dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()

// 根據郵政編碼統計客戶數量
dfCustomers.groupBy("zip_code").count().show()

在上一示例中,模式是通過反射而得來的。我們也可以通過程式設計的方式指定資料集的模式。這種方法在由於資料的結構以字串的形式編碼而無法提前定義定製類的情況下非常實用。

如下程式碼示例展示瞭如何使用新的資料型別類StructType,StringType和StructField指定模式。

//
// 用程式設計的方式指定模式
//

// 用已有的Spark Context物件建立SQLContext物件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 建立RDD物件
val rddCustomers = sc.textFile("data/customers.txt")

// 用字串編碼模式
val schemaString = "customer_id name city state zip_code"

// 匯入Spark SQL資料型別和Row
import org.apache.spark.sql._

import org.apache.spark.sql.types._;

// 用模式字串生成模式物件
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 將RDD(rddCustomers)記錄轉化成Row。
val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))

// 將模式應用於RDD物件。
val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)

// 將DataFrame註冊為表
dfCustomers.registerTempTable("customers")

// 用sqlContext物件提供的sql方法執行SQL語句。
val custNames = sqlContext.sql("SELECT name FROM customers")

// SQL查詢的返回結果為DataFrame物件,支援所有通用的RDD操作。
// 可以按照順序訪問結果行的各個列。
custNames.map(t => "Name: " + t(0)).collect().foreach(println)

// 用sqlContext物件提供的sql方法執行SQL語句。
val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")

// SQL查詢的返回結果為DataFrame物件,支援所有通用的RDD操作。
// 可以按照順序訪問結果行的各個列。
customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)

除了文字檔案之外,也可以從其他資料來源中載入資料,如JSON資料檔案,Hive表,甚至可以通過JDBC資料來源載入關係型資料庫表中的資料。

如上所示,Spark SQL提供了十分友好的SQL介面,可以與來自多種不同資料來源的資料進行互動,而且所採用的語法也是團隊熟知的SQL查詢語法。這對於非技術類的專案成員,如資料分析師以及資料庫管理員來說,非常實用。

總結

本文中,我們瞭解到Apache Spark SQL如何用熟知的SQL查詢語法提供與Spark資料互動的SQL介面。Spark SQL是一個功能強大的庫,組織中的非技術團隊成員,如業務分析師和資料分析師,都可以用Spark SQL執行資料分析。

下一篇文章中,我們將討論可用於處理實時資料或流資料的Spark Streaming庫。Spark Streaming庫是任何一個組織的整體資料處理和管理生命週期中另外一個重要的組成部分,因為流資料處理可為我們提供對系統的實時觀察。這對於欺詐檢測、線上交易系統、事件處理解決方案等用例來說至關重要。

參考文獻

  • Spark主站
  • Spark SQL網站
  • Spark SQL程式設計指南
  • 用Apache Spark進行大資料處理——第一部分:入門介紹

來源:http://www.infoq.com/cn/articles/apache-spark-sql