通過Spark結合使用Hive和ORC儲存格式
在這篇部落格中,我們將一起分析通過Spark訪問Hive的資料,主要分享以下幾點內容:
1. 如何通過Spark Shell互動式訪問Spark
2. 如何讀取HDFS檔案和建立一個RDD
3. 如何通過Spark API互動式地分析資料集
4. 如何建立Hive的ORC格式的表
5. 如何使用Spark SQL查詢Hive表
6. 如何以ORC格式儲存資料
Spark SQL使用Spark引擎對儲存在HDFS或者存在的RDDs執行SQL查詢。我們可以在Spark程式中使用SQL語句來操作資料。
1. 獲取資料集
在Linux伺服器終端中獲取樣例資料:
將下載的資料上傳到HDFS的目錄中,如下:
hdfs dfs -put ./yahoo_stocks.csv /tmp/
2. 啟動Spark Shell
spark-shell
這裡啟動了spark-shell,並且能夠和Hive進行互動,因為我們已經將hive-site.xml,hdfs-site.xml和core-site.xml拷貝到spark的conf目錄下面了。
匯入需要的庫檔案:
scala> import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql.hive.orc._
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
3. 建立SparkSession
在Spark 2.0中提供了SparkSession,內建支援Hive特性,包括使用HiveQL,訪問Hive UDFs,並且可以從Hive表中獲取資料。
建立例項:
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
我們使用spark-shell登入時,預設已經為我們建立了一個SparkSession的例項為spark,後面可以直接使用該例項。
Spark session available as 'spark'.
4. 建立ORC格式的表
在Hive中建立表:
scala> spark.sql("create table yahoo_orc_table (date STRING,open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")
res0: org.apache.spark.sql.DataFrame = []
5. 載入資料檔案並建立一個RDD
scala> val yahoo_stocks =sc.textFile("hdfs://SZB-L0023776:8020/tmp/yahoo_stocks.csv")
yahoo_stocks: org.apache.spark.rdd.RDD[String] =
hdfs://SZB-L0023776:8020/tmp/yahoo_stocks.csv MapPartitionsRDD[2] at textFile at <console>:30
獲取10行資料:
scala> yahoo_stocks.take(10).foreach(println)
Date,Open,High,Low,Close,Volume,AdjClose
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98
2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49
2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66
2015-04-17,45.30,45.44,44.25,44.45,13305700,44.45
2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78
6. 資料的首行為欄位名稱
scala> val header = yahoo_stocks.first
header: String = Date,Open,High,Low,Close,Volume,Adj Close
下面我們建立一個新的RDD,不包括首行欄位名稱:
scala> val data = yahoo_stocks.mapPartitionsWithIndex { (idx, iter)=> if (idx == 0) iter.drop(1) else iter }
data: org.apache.spark.rdd.RDD[String] =MapPartitionsRDD[3] at mapPartitionsWithIndex at <console>:32
7. 建立一個Schema
scala> case class YahooStockPrice(date: String, open: Float, high:Float, low: Float, close: Float, volume: Integer, adjClose: Float)
defined class YahooStockPrice
8. 將Schema繫結到處理後的資料上
針對YahooStockPrice建立一個RDD,並註冊為一張表:
scala> val stockprice = data.map(_.split(",")).map(row=> YahooStockPrice(row(0), row(1).trim.toFloat, row(2).trim.toFloat,row(3).trim.toFloat, row(4).trim.toFloat, row(5).trim.toInt,row(6).trim.toFloat)).toDF()
stockprice: org.apache.spark.sql.DataFrame = [date: string,open: float ... 5 more fields]
檢視資料:
scala> stockprice.first
res4: org.apache.spark.sql.Row =[2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]
檢視更多的資料:
scala> stockprice.show
驗證Schema:
scala> stockprice.printSchema
root
|-- date: string (nullable = true)
|-- open: float (nullable = false)
|-- high: float (nullable = false)
|-- low: float (nullable = false)
|-- close: float (nullable = false)
|-- volume: integer (nullable = true)
|-- adjClose: float (nullable = false)
9. 註冊一個臨時表
scala> stockprice.createOrReplaceTempView("yahoo_stocks_temp")
10. 查詢建立的臨時表
注意這裡的表不是Hive裡面的表,而是一個RDD:
scala> val results = spark.sql("SELECT * FROM yahoo_stocks_temp")
scala> results.map(t => "Stock Entry: " +t.toString).collect().foreach(println)
……
Stock Entry:[1996-05-06,32.50008,32.50008,29.37504,30.12504,8214400,1.25521]
Stock Entry: [1996-05-03,32.25,32.50008,31.24992,31.99992,6116800,1.33333]
Stock Entry:[1996-05-02,31.5,33.25008,31.5,32.87496,9731200,1.36979]
Stock Entry:[1996-05-01,30.25008,31.75008,30.0,31.62504,4881600,1.31771]
Stock Entry: [1996-04-30,31.24992,31.5,29.50008,29.74992,5003200,1.23958]
……
11. 作為ORC檔案格式儲存
我們將上面的資料寫入到Hive表裡面,並且儲存的檔案格式為ORC。
scala> results.write.format("orc").saveAsTable("yahoo_stocks_orc")
12. 讀取ORC檔案
scala> val yahoo_stocks_orc= spark.read.format("orc").load("yahoo_stocks_orc")
yahoo_stocks_orc: org.apache.spark.sql.DataFrame = [date:string, open: float ... 5 more fields]
註冊一個臨時基於記憶體的表並對映到此ORC表:
scala> yahoo_stocks_orc.createOrReplaceTempView("orcTest")
查詢:
scala> spark.sql("SELECT * from orcTest").collect.foreach(println)
……
[1996-04-29,31.5,31.99992,30.49992,31.00008,5928000,1.29167]
[1996-04-26,31.99992,32.25,31.24992,31.75008,7561600,1.32292]
[1996-04-25,30.0,32.25,28.99992,31.24992,19478400,1.30208]
[1996-04-24,28.5,29.12496,27.75,28.99992,7795200,1.20833]
……
13. 查詢Hive的表資料
我們在使用spark-shell登入時,預設初始化了一個spark例項:
Spark session available as 'spark'.
我們可以使用spark訪問Hive的表資料。
scala> val tableDF =spark.sql("select * from yahoo_stocks_orc limit 10")
tableDF: org.apache.spark.sql.DataFrame = [date: string,open: float ... 5 more fields]
檢視10行資料:
scala> tableDF.take(10).foreach(println)
[2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]
[2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36]
[2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52]
[2015-04-23,43.92,44.06,43.58,43.7,14274900,43.7]
[2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98]
[2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49]
[2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66]
[2015-04-17,45.3,45.44,44.25,44.45,13305700,44.45]
[2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78]
[2015-04-15,45.46,45.83,45.23,45.73,15033500,45.73]