【Big Data 每日一題20180927】Structured Streaming 之 Event Time 解析
Structured Streaming 之 Event Time 解析
[酷玩 Spark] Structured Streaming 原始碼解析系列 ,返回目錄請 猛戳這裡
本文內容適用範圍:
* 2017.07.11 update, Spark 2.2 全系列 √ (已釋出:2.2.0)
* 2017.10.02 update, Spark 2.1 全系列 √ (已釋出:2.1.0, 2.1.1, 2.1.2)
閱讀本文前,請一定先閱讀 Structured Streaming 實現思路與實現概述 一文,其中概述了 Structured Streaming 的實現思路,有了全域性概念後再看本文的細節解釋。
Event Time !
Spark Streaming 時代有過非官方的 event time 支援嘗試 [1],而在進化後的 Structured Streaming 裡,添加了對 event time 的原生支援。
我們來看一段官方 programming guide 的例子 [2]:
import spark.implicits._ val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } // Group the data by window and word and compute the count of each group // Please note: we'll revise this example in <Structured Streaming 之 Watermark 解析> val windowedCounts = words.groupBy( window($"timestamp", "10 minutes", "5 minutes"), $"word" ).count()
這裡的執行過程如下圖。
- 我們有一系列 arriving 的 records
- 首先是一個對著時間列
timestamp
做長度為10m
,滑動為5m
的 window() 操作- 例如上圖右上角的虛框部分,當達到一條記錄
12:22|dog
時,會將12:22
歸入兩個視窗12:15-12:25
、12:20-12:30
,所以產生兩條記錄:12:15-12:25|dog
、12:20-12:30|dog
,對於記錄12:24|dog owl
同理產生兩條記錄:12:15-12:25|dog owl
、12:20-12:30|dog owl
- 所以這裡 window() 操作的本質是 explode()
- 例如上圖右上角的虛框部分,當達到一條記錄
- 然後對 window() 操作的結果,以
window
列和word
列為 key,做 groupBy().count() 操作- 這個操作的聚合過程是增量的(藉助 StateStore)
- 最後得到一個有
window
,word
,count
三列的狀態集
處理 Late Data
還是沿用前面 window() + groupBy().count() 的例子,但注意有一條遲到的資料 12:06|cat
:
可以看到,在這裡的 late data,在 State 裡被正確地更新到了應在的位置。
OutputModes
我們繼續來看前面 window() + groupBy().count() 的例子,現在我們考慮將結果輸出,即考慮 OutputModes:
(a) Complete
Complete 的輸出是和 State 是完全一致的:
(b) Append
Append 的語義將保證,一旦輸出了某條 key,未來就不會再輸出同一個 key。
所以,在上圖 12:10
這個批次直接輸出 12:00-12:10|cat|1
, 12:05-12:15|cat|1
將是錯誤的,因為在 12:20
將結果更新為了 12:00-12:10|cat|2
,但是 Append 模式下卻不會再次輸出 12:00-12:10|cat|2
,因為前面輸出過了同一條 key 12:00-12:10|cat
的結果12:00-12:10|cat|1
。
為了解決這個問題,在 Append 模式下,Structured Streaming 需要知道,某一條 key 的結果什麼時候不會再更新了。當確認結果不會再更新的時候(下一篇文章專門詳解依靠 watermark 確認結果不再更新),就可以將結果進行輸出。
如上圖所示,如果我們確定 12:30
這個批次以後不會再有對 12:00-12:10
這個 window 的更新,那麼我們就可以把 12:00-12:10
的結果在 12:30
這個批次輸出,並且也會保證後面的批次不會再輸出 12:00-12:10
的 window 的結果,維護了 Append 模式的語義。
(c) Update
Update 模式已在 Spark 2.1.1 及以後版本獲得正式支援。
如上圖所示,在 Update 模式中,只有本執行批次 State 中被更新了的條目會被輸出:
- 在 12:10 這個執行批次,State 中全部 2 條都是新增的(因而也都是被更新了的),所以輸出全部 2 條;
- 在 12:20 這個執行批次,State 中 2 條是被更新了的、 4 條都是新增的(因而也都是被更新了的),所以輸出全部 6 條;
- 在 12:30 這個執行批次,State 中 4 條是被更新了的,所以輸出 4 條。這些需要特別注意的一點是,如 Append 模式一樣,本執行批次中由於(通過 watermark 機制)確認
12:00-12:10
這個 window 不會再被更新,因而將其從 State 中去除,但沒有因此產生輸出。
總結
本文解析了 Structured Streaming 原生提供的對 event time 的支援,包括 window()、groupBy() 增量聚合、對 late date 的支援、以及在 Complete, Append, Update 模式下的輸出結果。