1. 程式人生 > >將巢狀的json轉成Hivesql 然後再進行操作

將巢狀的json轉成Hivesql 然後再進行操作

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.slf4j.{Logger, LoggerFactory}
import utils.Logs

/**
  * @Auther: sss
  * @Date: 2018/11/9 11:39
  * @Description:
  */


object Demo03 {
  private val logger = LoggerFactory.getLogger(Demo03.getClass)

  def main(args: Array[String]): Unit = {

    val session = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getSimpleName)
      .getOrCreate()
    import session.implicits._
    val frame = session.read.json("e:\\json1.json")


    frame.createOrReplaceTempView("tables01")

    val sql_a =
      """
        |select * from tables01
      """.stripMargin

    val df1 = session.sql(sql_a)
    // df1.foreach(t=> println(t))
    println("-------------111111111---------------------")
    val rdd = df1.rdd.flatMap {
      row =>
        val dataId = row.getAs[Long]("dataId")
        val dataType = row.getAs[String]("dataType")
        println(dataType.toString + "-------------2222222222222222---------------------")

        row.getAs[Seq[Row]]("resultData").map {
          rows =>
            val binlog = rows.getAs[String]("binlog")
            var tag = ""
            rows.getAs[Seq[Row]]("column").map {
              row02 =>
                val columnname = row02.getAs[String]("columnname")
                val columntype = row02.getAs[String]("columntype")
                val index = row02.getAs[Long]("index")
                val modified = row02.getAs[Boolean]("modified")
                val pk = row02.getAs[Boolean]("pk")
                val sqlType = row02.getAs[Long]("sqlType")
                val value = row02.getAs[String]("value")

                tag = String.format(s"[columnname:$columnname,columntype:$columntype,index:$index,modified:$modified,pk:$pk,sqlType:$sqlType,value:$value]")
            }

            val db = rows.getAs[String]("db")
            val eventType = rows.getAs[String]("eventType")
            val pkValue = rows.getAs[String]("pkValue")
            val sql = rows.getAs[String]("sql")
            val table = rows.getAs[String]("table")
            val time = rows.getAs[Long]("time")

            val tags = String.format(s"[dataId:$dataId,dataType:$dataType,binlog:$binlog,tag:$tag,db:$db,eventType:$eventType,pkValue:$pkValue,sql:$sql,table:$table,time:$time]")

            //dataId, dataType, binlog, tag, db, eventType, pkValue, sql, table, time,
            ( tags)

        }
    }

    rdd.toDF().write.saveAsTable("tmp_table")

    //,_2,_3,_4,_5,_6,_7,_8,_9,_10
    val result =
      """
        |select * from tmp_table
      """.stripMargin
    session.sql(result).write.json("C:\\Users\\sss\\Desktop\\outExample01")
  }
}

資料如下:

{"dataId":123,"dataType":"mysql","resultData":[{"binlog":"mysql_binlog.000","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"7"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"7"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"7"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"7"}],"db":"demo","eventType":"insert","pkValue":"7","sql":"woshisql","table":"student","time":80146942099474},{"binlog":"mysql_binlog.001","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"9"}],"db":"demo","eventType":"insert","pkValue":"9","sql":"woshisql","table":"student","time":80146943574276},{"binlog":"mysql_binlog.002","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"2"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"2"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"2"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"2"}],"db":"demo","eventType":"update","pkValue":"2","sql":"woshisql","table":"student","time":80146943586222},{"binlog":"mysql_binlog.003","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"4"}],"db":"demo","eventType":"update","pkValue":"4","sql":"woshisql","table":"student","time":80146943592561},{"binlog":"mysql_binlog.004","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"6"}],"db":"demo","eventType":"update","pkValue":"6","sql":"woshisql","table":"student","time":80146943599876},{"binlog":"mysql_binlog.005","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"4"}],"db":"demo","eventType":"update","pkValue":"4","sql":"woshisql","table":"student","time":80146943605971},{"binlog":"mysql_binlog.006","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"6"}],"db":"demo","eventType":"update","pkValue":"6","sql":"woshisql","table":"student","time":80146943611335},{"binlog":"mysql_binlog.007","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"9"}],"db":"demo","eventType":"update","pkValue":"9","sql":"woshisql","table":"student","time":80146943617917},{"binlog":"mysql_binlog.008","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"10"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"10"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"10"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"10"}],"db":"demo","eventType":"update","pkValue":"10","sql":"woshisql","table":"student","time":80146943623525},{"binlog":"mysql_binlog.009","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"1"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"1"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"1"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"1"}],"db":"demo","eventType":"update","pkValue":"1","sql":"woshisql","table":"student","time":80146943629864}]}