1. 程式人生 > >Spark SQL之外部資料來源

Spark SQL之外部資料來源

概述

  從Spark 1.2版本開始,Spark SQL正式支援外部資料來源。它可以通過DataFrame介面對各種資料來源進行操作,例如orc,parquet,json,hive,jdbc,avro等。它既可以通過轉換成RDD進行操作,也可以被建立為一個臨時檢視。將外部資料讀入後建立為一個臨時檢視,可以用sql的方式進行查詢處理。這個特性可以很方便讓我們直接可以用Spark SQL操作各種格式的資料,而且可以輕鬆的將它們進行互相轉換。下面將介紹Spark SQL載入和儲存外部資料來源的一般方法。

測試JSON
這裡有一個people.json檔案,這裡用到的資料為spark官方提供的測試資料,在spark包的examples目錄下。

[[email protected] resources]$ cat people.json 
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

接下來用Spark SQL的外部資料來源的方式將資料載入為DataFrame。這裡用的是一種通用的方式進行載入。format裡面傳入的是將要處理的檔案的格式。load傳入的是檔案的路徑。

object ExternalDataSourceAPP {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().appName("ExternalDataSourceAPP")
      .master("local[2]")
      .getOrCreate()

    /**
      * 通用
      */

      // 讀
       val jsonDF = sparkSession.read.format("json").load("file:\\F:\\sparktestdata\\resources\\people.json")
	   
       jsonDF.show()

	 sparkSession.stop()
  }
}

-------------------輸出結果--------------------
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

還有一種簡寫的方式進行載入,read之後直接.+你要處理的資料格式,裡面傳入path即可。例如處理json格式如下。其它的還有scv,text,jdbc,orc,parquet,table(hive)等。

val jsonDF = sparkSession.read.json("file:\\F:\\sparktestdata\\resources\\people.json")
      jsonDF.show()

Spark SQL外部資料來源不僅可以讀,還可以寫。我們將上面的jsonDF寫入一個json檔案。注意,這裡寫入只能指定到資料夾,不能指定到檔案。mode是寫入的方式,傳入的引數有overwrite,append,ignore,error。

jsonDF.write.format("json").mode("overwrite").save("file:\\F:\\sparktestdata\\resources\\peopleCopy.json")

這裡我們可以將讀入的json格式儲存為其它格式的資料,例如儲存為orc。我們可以發現,利用外部資料來源我們可以很方便的轉換資料格式。

jsonDF.write.format("orc").mode("overwrite").save("file:\\F:\\sparktestdata\\resources\\peopleCopy.json")

我們還可以直接將jsonDF儲存到hive表中

jsonDF.write.saveAsTable("person")