1. 程式人生 > >Spark使用explode展開巢狀的JSON資料

Spark使用explode展開巢狀的JSON資料

在使用Spark的人中,估計很多人都會使用DataFrame及SQLContext,而在眾多的資料格式中,很可能會遇到JSON資料,此資料還可能包含巢狀關係,比如像如下的JSON資料:

{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
{"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}
{"name
":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}

此時,如果我們直接用DataFrame的show方法可以看到:

+---+--------------------+-------+
|age|             myScore|   name|
+---+--------------------+-------+
| 25|  [[23,19], [50,58]]|Michael|
| 30|[[33,29], [52,38]...|   Andy|
| 19|  [[43,39], [53,28]]| Justin|
+---+--------------------+-------+

root
 |-- age: long (nullable = true)
 |-- myScore: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- score2: long (nullable = true)
 |    |    |-- score1: long (nullable = true)
 |-- name: string (nullable = true)

由於myScore是一個數組,所以,在上述show得到的表中,我們不能直接使用sql來查詢或聚合,那麼如何才能將myScore的陣列型別展開呢?
我們可以考慮使用explode函式,如下

val df = sqlContext.read.json("hdfs://master:9000/test/people_Array.json")
df.show()
df.printSchema()
val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
val dfMyScore = dfScore.select
("name","myScore.score1", "myScore.score2") dfScore.show()

此時,會得到如下結果,這個時候的表,就跟我們平時看到的關係型資料庫的表是一樣的了,接下來,我們就可以執行相關的sql查詢了。

+-------+-----------------+------------------+
|   name|           score1|            score2|
+-------+-----------------+------------------+
|Michael|               19|                23|
|Michael|               58|                50|
|   Andy|               29|                33|
|   Andy|               38|                52|
|   Andy|               88|                71|
| Justin|               39|                43|
| Justin|               28|                53|
+-------+-----------------+------------------+

完整的程式碼如下:

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.junit.{After, Before, Test}
import org.junit.Assert.assertEquals

/**
  * Created by yang on 8/16/16.
  */
class Test {

  @transient var sc: SparkContext = _

  @transient var sqlContext:SQLContext = _

  @Before
  def init(): Unit ={
    val conf = new SparkConf().setAppName("Test").setMaster("spark://master:7077")
    sc = new SparkContext(conf)
    sqlContext = new org.apache.spark.sql.SQLContext(sc)
  }

  @Test
  def TestMapFun(): Unit ={
    val df = sqlContext.read.json("hdfs://master:9000/test/people_Array.json")
    df.show()
    df.printSchema()

    val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
    val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
    dfMyScore.show()

    dfMyScore.registerTempTable("table1")
    val result = sqlContext.sql("select name,avg(hwScore_Std),avg(exScore_Std) from table1")
    assertEquals(7,dfMyScore.count())
  }
}  

以上程式碼需要一些包,我是用sbt構建的,內容如下:

name := "Test"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "junit" % "junit" % "4.12" % "test"

libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0"