Spark zip運算元優雅的實現差分計算
阿新 • • 發佈:2018-12-22
Spark zip運算元實現高效的差分演算法
什麼是差分呢?用公式表示就如下圖所示:
差分計算有什麼用呢?它在時間序列建模中重要作用自不必說,單是在日常開發的各個場景中就有許多用的上的地方,比如在廣告瀏覽中,我們監控每個id點選某個廣告的間隔時長等指標,判斷是否存在惡意點選的行為;再比如我們監控某個感測器傳入的資料間隔,判斷該感測器是否正常工作。類似的應用場景還有許多。在這裡就不做更多介紹。
雖然這些計算在實時處理的時候可以很簡單的就能實現,但是很多情況下我們還是需要進行離線計算,在spark sql中如何能快速的實現這個功能呢,zip運算元為我們提供了一個優雅的實現。
案例:現在我們有一個數據集,記錄的是感測器傳輸訊息的時間,為了簡單起見,筆者在這裡只給出了id,和時間戳,我們要計算感測器傳輸的時間間隔
|
// 資料量
+---+-----+
|mid|count|
+---+-----+
| m1|84840|
| m2|35621|
+---+-----+
val spec = Window.partitionBy($"mid").orderBy($"timestamp") val sensor = spark.read.option("header", true) .option("InferSchema", true) .csv(inputFile) // 載入資料 .withColumn("r", row_number().over(spec)) // 利用視窗函式對資料分割槽排序 /** * 實現的思路就是將原資料分為分別 “去頭掐尾” * 然後在利用zip函式 進行差分計算 */ val n = 1 // 差分階數 val lastnMap = sensor.groupBy($"mid").agg(last("r")) .toDF("mid", "r").rdd.map(row => { val mid = row.getAs[String]("mid") val r = row.getAs[Int]("r") val range = (r - n + 1) to r range.map(i => (mid, i) -> false) }).flatMap(_.toMap).collect().toMap val lastBroad = spark.sparkContext.broadcast(lastnMap) val exceptLastUDF = udf((mmsi: String, r: Int) => { lastBroad.value.getOrElse((mmsi, r), true) }) sensor.groupBy($"mid").count().show() // 去掉每組後n行資料 val exceptLastRDD: RDD[Row] = sensor.where(exceptLastUDF($"mid", $"r")).rdd // 去掉每組前n行資料 val exceptFirstRDD: RDD[Row] = sensor.where($"r" > n).rdd // RDD[(Row, Row) val diffed: DataFrame = exceptFirstRDD.zip(exceptLastRDD) .map(drow => { // 兩個row之間還可以進行更復雜的操作,這裡只做時間差計算 val mid = drow._1.getAs[String]("mid") val time = drow._1.getAs[Int]("timestamp") val time2 = drow._2.getAs[Int]("timestamp") val r = drow._1.getAs[Int]("r") val r2 = drow._2.getAs[Int]("r") (mid, time, r, r2, time - time2) }).toDF("mid", "time", "r", "r2", "timeDiff")
// 結果檢視 diffed.show() +---+----------+---+---+--------+ |mid| time| r| r2|timeDiff| +---+----------+---+---+--------+ | m1|1519833112| 2| 1| 180| | m1|1519833253| 3| 2| 141| | m1|1519833472| 4| 3| 219| | m1|1519833652| 5| 4| 180| | m1|1519833952| 6| 5| 300| +---+----------+---+---+--------+