1. 程式人生 > >【Big Data 每日一題20180927】Structured Streaming 之 Event Time 解析

【Big Data 每日一題20180927】Structured Streaming 之 Event Time 解析

Structured Streaming 之 Event Time 解析

[酷玩 Spark] Structured Streaming 原始碼解析系列 ,返回目錄請 猛戳這裡

本文內容適用範圍:
* 2017.07.11 update, Spark 2.2 全系列 √ (已釋出:2.2.0)
* 2017.10.02 update, Spark 2.1 全系列 √ (已釋出:2.1.0, 2.1.1, 2.1.2)

閱讀本文前,請一定先閱讀 Structured Streaming 實現思路與實現概述 一文,其中概述了 Structured Streaming 的實現思路,有了全域性概念後再看本文的細節解釋。

Event Time !

Spark Streaming 時代有過非官方的 event time 支援嘗試 [1],而在進化後的 Structured Streaming 裡,添加了對 event time 的原生支援。

我們來看一段官方 programming guide 的例子 [2]:

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
// Please note: we'll revise this example in <Structured Streaming 之 Watermark 解析>
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

這裡的執行過程如下圖。

  • 我們有一系列 arriving 的 records
  • 首先是一個對著時間列 timestamp 做長度為10m,滑動為5m 的 window() 操作
    • 例如上圖右上角的虛框部分,當達到一條記錄 12:22|dog 時,會將 12:22 歸入兩個視窗 12:15-12:2512:20-12:30,所以產生兩條記錄:12:15-12:25|dog12:20-12:30|dog,對於記錄 12:24|dog owl 同理產生兩條記錄:12:15-12:25|dog owl12:20-12:30|dog owl
    • 所以這裡 window() 操作的本質是 explode()
      ,可由一條資料產生多條資料
  • 然後對 window() 操作的結果,以 window 列和 word 列為 key,做 groupBy().count() 操作
    • 這個操作的聚合過程是增量的(藉助 StateStore)
  • 最後得到一個有 windowwordcount 三列的狀態集

處理 Late Data

還是沿用前面 window() + groupBy().count() 的例子,但注意有一條遲到的資料 12:06|cat :

可以看到,在這裡的 late data,在 State 裡被正確地更新到了應在的位置。

OutputModes

我們繼續來看前面 window() + groupBy().count() 的例子,現在我們考慮將結果輸出,即考慮 OutputModes:

(a) Complete

Complete 的輸出是和 State 是完全一致的:

(b) Append

Append 的語義將保證,一旦輸出了某條 key,未來就不會再輸出同一個 key。

所以,在上圖 12:10 這個批次直接輸出 12:00-12:10|cat|112:05-12:15|cat|1 將是錯誤的,因為在 12:20 將結果更新為了 12:00-12:10|cat|2,但是 Append 模式下卻不會再次輸出 12:00-12:10|cat|2,因為前面輸出過了同一條 key 12:00-12:10|cat 的結果12:00-12:10|cat|1

為了解決這個問題,在 Append 模式下,Structured Streaming 需要知道,某一條 key 的結果什麼時候不會再更新了。當確認結果不會再更新的時候(下一篇文章專門詳解依靠 watermark 確認結果不再更新),就可以將結果進行輸出。

如上圖所示,如果我們確定 12:30 這個批次以後不會再有對 12:00-12:10 這個 window 的更新,那麼我們就可以把 12:00-12:10 的結果在 12:30 這個批次輸出,並且也會保證後面的批次不會再輸出 12:00-12:10 的 window 的結果,維護了 Append 模式的語義。

(c) Update

Update 模式已在 Spark 2.1.1 及以後版本獲得正式支援。

如上圖所示,在 Update 模式中,只有本執行批次 State 中被更新了的條目會被輸出:

  • 在 12:10 這個執行批次,State 中全部 2 條都是新增的(因而也都是被更新了的),所以輸出全部 2 條;
  • 在 12:20 這個執行批次,State 中 2 條是被更新了的、 4 條都是新增的(因而也都是被更新了的),所以輸出全部 6 條;
  • 在 12:30 這個執行批次,State 中 4 條是被更新了的,所以輸出 4 條。這些需要特別注意的一點是,如 Append 模式一樣,本執行批次中由於(通過 watermark 機制)確認 12:00-12:10 這個 window 不會再被更新,因而將其從 State 中去除,但沒有因此產生輸出。

總結

本文解析了 Structured Streaming 原生提供的對 event time 的支援,包括 window()、groupBy() 增量聚合、對 late date 的支援、以及在 Complete, Append, Update 模式下的輸出結果。

擴充套件閱讀

參考資料