1. 程式人生 > >Spark SQL,如何將 DataFrame 轉為 json 格式

Spark SQL,如何將 DataFrame 轉為 json 格式

今天主要介紹一下如何將 Spark dataframe 的資料轉成 json 資料。用到的是 scala 提供的 json 處理的 api。

用過 Spark SQL 應該知道,Spark dataframe 本身有提供一個 api 可以供我們將資料轉成一個 JsonArray,我們可以在 spark-shell 裡頭舉個栗子來看一下。

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate();
//提供隱式轉換功能,比如將 Rdd 轉為 dataframe
import spark.implicits._

val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF()
df.show()
/*-------------show -----------
+---+---+
| _1| _2|
+---+---+
|abc|  2|
|efg|  4|
+---+---+
*/

//這裡使用 dataframe Api 轉換成 jsonArray
val jsonStr:String = a.toJSON.collectAsList.toString
/*--------------- json String-------------
[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
*/

可以發現,我們可以使用 dataframe 提供的 api 直接將 dataframe 轉換成 jsonArray 的形式,但這樣子卻有些冗餘。以上面的例子來說,很多時候我要的不是這樣的形式。

[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]

而是下面這種形式。

[{"abc":2}, {"efg":4}]

這才是我們通常會使用到的 json 格式。以 dataframe 的 api 轉換而成的 json 明顯太過冗餘。為此,我們需要藉助一些 json 處理的包,本著能懶則懶的原則,直接使用 scala 提供的 json 處理包。

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate();
//提供隱式轉換功能,比如將 Rdd 轉為 dataframe
import spark.implicits._

val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF()
df.show()
/*-------------show -----------
+---+---+
| _1| _2|
+---+---+
|abc|  2|
|efg|  4|
+---+---+
*/

//接下來不一樣了
val df2Array:Array[Tuple2[String,Int]] = df.collect().map{case org.apache.spark.sql.Row(x:String,y:Int) => (x,y)}

val jsonData:Array[JSONObject] = aM.map{ i =>
  new JSONObject(Map(i._1 -> i._2))
}

val jsonArray:JSONArray = new JSONArray(jsonData.toList)
/*-----------jsonArray------------
[{"abc" : 2}, {"efg" : 4}]
*/

大概說明一下上述的程式碼,首先我們要先將 df 變數進行 collect 操作,將它轉換成 Array ,但是要生成 jsonObject 得是 Array[Tuple2[T,T]] 的格式,所以我們需要再進一步轉換成對應格式。這裡的 map 是函數語言程式設計裡面的 map 。

然後也是用 map 操作生成 Array[JSONObject],最後再轉換成 JSONArray 就可以。

將資料轉換成 json 的格式通常不能太大,一般用在 spark 跑出資料結果後寫入到其他資料庫的時候會用到,比如 Mysql 。

以上~~


歡迎關注公眾號哈爾的資料城堡,裡面有資料,程式碼,以及深度的思考。