1. 程式人生 > >Spark zip運算元優雅的實現差分計算

Spark zip運算元優雅的實現差分計算

Spark zip運算元實現高效的差分演算法

什麼是差分呢?用公式表示就如下圖所示:
差分

差分計算有什麼用呢?它在時間序列建模中重要作用自不必說,單是在日常開發的各個場景中就有許多用的上的地方,比如在廣告瀏覽中,我們監控每個id點選某個廣告的間隔時長等指標,判斷是否存在惡意點選的行為;再比如我們監控某個感測器傳入的資料間隔,判斷該感測器是否正常工作。類似的應用場景還有許多。在這裡就不做更多介紹。

雖然這些計算在實時處理的時候可以很簡單的就能實現,但是很多情況下我們還是需要進行離線計算,在spark sql中如何能快速的實現這個功能呢,zip運算元為我們提供了一個優雅的實現。
案例:現在我們有一個數據集,記錄的是感測器傳輸訊息的時間,為了簡單起見,筆者在這裡只給出了id,和時間戳,我們要計算感測器傳輸的時間間隔
| 資料檢視

|

// 資料量
+---+-----+
|mid|count|
+---+-----+
| m1|84840|
| m2|35621|
+---+-----+

    val spec = Window.partitionBy($"mid").orderBy($"timestamp")

    val sensor = spark.read.option("header", true)
      .option("InferSchema", true)
      .csv(inputFile) // 載入資料
      .withColumn("r", row_number().over(spec)) // 利用視窗函式對資料分割槽排序


    /**
      * 實現的思路就是將原資料分為分別 “去頭掐尾”
      * 然後在利用zip函式 進行差分計算
      */
    val n = 1 // 差分階數
    val lastnMap = sensor.groupBy($"mid").agg(last("r"))
      .toDF("mid", "r").rdd.map(row => {
      val mid = row.getAs[String]("mid")
      val r = row.getAs[Int]("r")
      val range = (r - n + 1) to r
      range.map(i => (mid, i) -> false)
    }).flatMap(_.toMap).collect().toMap

    val lastBroad = spark.sparkContext.broadcast(lastnMap)

    val exceptLastUDF = udf((mmsi: String, r: Int) => {
      lastBroad.value.getOrElse((mmsi, r), true)
    })



    sensor.groupBy($"mid").count().show()

    // 去掉每組後n行資料
    val exceptLastRDD: RDD[Row] = sensor.where(exceptLastUDF($"mid", $"r")).rdd

    // 去掉每組前n行資料
    val exceptFirstRDD: RDD[Row] = sensor.where($"r" > n).rdd

    // RDD[(Row, Row)
    val diffed: DataFrame = exceptFirstRDD.zip(exceptLastRDD)
      .map(drow => {
        //     兩個row之間還可以進行更復雜的操作,這裡只做時間差計算
        val mid = drow._1.getAs[String]("mid")
        val time = drow._1.getAs[Int]("timestamp")
        val time2 = drow._2.getAs[Int]("timestamp")
        val r = drow._1.getAs[Int]("r")
        val r2 = drow._2.getAs[Int]("r")
        (mid, time, r, r2, time - time2)
      }).toDF("mid", "time", "r", "r2", "timeDiff")

    // 結果檢視
    diffed.show()
 +---+----------+---+---+--------+
|mid|      time|  r| r2|timeDiff|
+---+----------+---+---+--------+
| m1|1519833112|  2|  1|     180|
| m1|1519833253|  3|  2|     141|
| m1|1519833472|  4|  3|     219|
| m1|1519833652|  5|  4|     180|
| m1|1519833952|  6|  5|     300|
+---+----------+---+---+--------+