為什麼 Spark Streaming + Kafka 無法保證 exactly once?
Streaming job 的排程與執行
為什麼很難保證 exactly once
上面這張流程圖最主要想說明的就是,job 的提交執行是非同步的,與 checkpoint 操作並不是原子操作。這樣的機制會引起資料重複消費問題:
為了簡化問題容易理解,我們假設一個 batch 只生成一個 job,並且 spark.streaming.concurrentJobs 值為1,該值代表 jobExecutor 執行緒池中執行緒的個數,也即可以同時執行的 job 的個數。
假設,batch duration 為2s,一個 batch 的總共處理時間為1s,此時,一個 batch 開始了,第一步生成了一個 job,假設花了0.1s,然後把該 job 丟到了 jobExecutor 執行緒池中等待排程執行,由於 checkpoint 操作和 job 線上程池中執行是非同步的,在0.2s 的時候,checkpoint 操作完成並且此時開始了 job 的執行。
注意,這個時候 checkpoint 完成了並且該 job 在 checkpoint 中的狀態是未完成的,隨後在第1s 的時候 job 完成了,那麼在這個 batch 結束的時候 job 已經完成了但該 job 在 checkpoint 中的狀態是未完成的
在下一個 batch 執行到 checkpoint 之前就掛了(比如在拉取資料的時候掛了、OOM 掛了等等異常情況),driver 隨後從 checkpoint 中恢復,那麼上述的 job 依然是未執行的,根據使用的 api 不同,對於這個 job 會再次拉取資料或從 wal 中恢復資料重新執行該 job,那麼這種情況下該 job 的資料就就會被重複處理。比如這時記次的操作,那麼次數就會比真實的多。
如果一個 batch 有多個 job 並且spark.streaming.concurrentJobs大於1,那麼這種情況就會更加嚴重,因為這種情況下就會有多個 job 已經完成但在 checkpoint 中還是未完成狀態,在 driver 重啟後這些 job 對應的資料會被重複消費處理。
另一種會導致資料重複消費的情況主要是由於 Spark 處理的資料單位是 partition 引起的。比如在處理某 partition 的資料到一半的時候,由於資料內容或格式會引起拋異常,此時 task 失敗,Spark 會排程另一個同樣的 task 執行,那麼此時引起 task 失敗的那條資料之前的該 partition 資料就會被重複處理,雖然這個 task 被再次排程依然會失敗。若是失敗還好,如果某些特殊的情況,新的 task 執行成功了,那麼我們就很難發現數據被重複消費處理了。
如何保證 exactly once
至於如何才能保證 exactly once,其實要根據具體情況而定(廢話)。總體來說,可以考慮以下幾點:
業務是否不能容忍即使是極少量的資料差錯,如果是那麼考慮 exactly once。如果可以容忍,那就沒必要非實現 exactly once 不可
即使重複處理極小部分資料會不會對最終結果產生影響。若不會,那重複處理就重複吧,比如排重統計
若一定要保證 exactly once,應該考慮將對 partition 處理和 checkpoint或自己實現類似 checkpoint 功能的操作做成原子的操作;並且對 partition 整批資料進行類似事物的處理
感興趣可以加Java架構師群獲取Java工程化、高效能及分散式、高效能、深入淺出。高架構。效能調優、Spring,MyBatis,Netty原始碼分析和大資料等多個知識點高階進階乾貨的直播免費學習許可權 都是大牛帶飛 讓你少走很多的彎路的 群…號是:855801563 對了 小白勿進 最好是有開發經驗
注:加群要求
1、具有工作經驗的,面對目前流行的技術不知從何下手,需要突破技術瓶頸的可以加。
2、在公司待久了,過得很安逸,但跳槽時面試碰壁。需要在短時間內進修、跳槽拿高薪的可以加。
3、如果沒有工作經驗,但基礎非常紮實,對java工作機制,常用設計思想,常用java開發框架掌握熟練的,可以加。
4、覺得自己很牛B,一般需求都能搞定。但是所學的知識點沒有系統化,很難在技術領域繼續突破的可以加。
5.阿里Java高階大牛直播講解知識點,分享知識,多年工作經驗的梳理和總結,帶著大家全面、科學地建立自己的技術體系和技術認知!