1. 程式人生 > >dataframe的進行json數據的壓平、增加一列的id自增列

dataframe的進行json數據的壓平、增加一列的id自增列

swe name 元素 each apache justin 自增 cit sql

{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
{"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}
{"name":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}
{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
object explodeTest {
  def main(args: Array[String]): Unit = {

    val sparks = SparkSession.builder.master("local[4]").appName("test1").getOrCreate
    val sc = sparks.sparkContext

    val df=  sparks.read.json("file:///C:\\Users\\imp\\Desktop\\bo-kong\\data\\josn")

    df.show()
    //spark  讀取json 數據
    /**+---+--------------------+-------+
|age|             myScore|   name|
+---+--------------------+-------+
| 25|  [[19,23], [58,50]]|Michael|
| 30|[[29,33], [38,52]...|   Andy|
| 19|  [[39,43], [28,53]]| Justin|
| 25|  [[19,23], [58,50]]|Michael|
| 30|[[29,33], [38,52]...|   Andy|
| 19|  [[39,43], [28,53]]| Justin|
| 25|  [[19,23], [58,50]]|Michael|
| 30|[[29,33], [38,52]...|   Andy|
| 19|  [[39,43], [28,53]]| Justin|
+---+--------------------+-------+
      *
      *
      *
      */

    //使用spark.sql.functions._ explode函數進行壓平操作  行轉列
    import org.apache.spark.sql.functions._
    val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
    val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
    dfScore.show()
   df.createOrReplaceTempView("df")
    //u.answer, ‘‘
    /**
      *
      *
      *
      * +-------+-------+
      * |   name|myScore|
      * +-------+-------+
      * |Michael|[19,23]|
      * |Michael|[58,50]|
      * |   Andy|[29,33]|
      * |   Andy|[38,52]|
      * |   Andy|[88,71]|
      * | Justin|[39,43]|
      * | Justin|[28,53]|
      * |Michael|[19,23]|
      * |Michael|[58,50]|
      * |   Andy|[29,33]|
      * |   Andy|[38,52]|
      * |   Andy|[88,71]|
      * | Justin|[39,43]|
      * | Justin|[28,53]|
      * |Michael|[19,23]|
      * |Michael|[58,50]|
      * |   Andy|[29,33]|
      * |   Andy|[38,52]|
      * |   Andy|[88,71]|
      * | Justin|[39,43]|
      * +-------+-------+
      * only showing top 20 rows
      */



  }
}

 
數據
aa
bb
cc
dd
ee
ff

dataframe增加index主鍵列

 case  class Log(map:scala.collection.mutable.Map[String,String],ID: Long)
    import sparks.implicits._
  val data2 =  sc.parallelize(Seq((Map("uuid"->"sxexx","ip"->"192.168")),Map("uuid"->"man","ip"->"192.168.10.1"))).zipWithIndex()
    .map(i
=>(i._1,i._2)) data2.collect().foreach(print(_)) /** * 先創造一個Rdd[map] 使用zipWithIndex 看看效果 第二個元素為id主鍵 * * * (Map(uuid -> sxexx, ip -> 192.168),0) * (Map(uuid -> man, ip -> 192.168.10.1),1) */ val data= sc.textFile("file:///C:\\Users\\imp\\Desktop\\bo-kong\\data\\data") .zipWithIndex().toDF("id","value") data.show() /** * 使用上面的數據的得出結果 * +---+-----+ * | id|value| * +---+-----+ * | aa| 0| * | bb| 1| * | cc| 2| * | dd| 3| * | ee| 4| * | ff| 5| * +---+-----+ */

dataframe的進行json數據的壓平、增加一列的id自增列