Spark Streaming對Exactly Once的實現原理
昨天看到了這篇文章: 為什麼Spark Streaming + Kafka很難保證exactly once? 看過後,對作者對Exactly Once的理解不敢苟同,所以想寫這篇文章,闡述一下我對Spark Streaming保證Exactly Once語義的理解。
Exactly Once實現的整體性
首先一個很重要的道理是: 整個系統對exactly once的保證,從來都不是靠系統中某一部分來實現就能搞定的,需要整個流式系統一起努力才可以實現。
對Spark Streaming來說, Exactly once的實現,需要系統中三部分的整體保證:
輸入源 --> Spark Streaming計算 ---> 輸出操作
"輸入源"對於exactly once的實現: Kafka的directly API其實就是在解決輸入源輸入資料的exactly once語義;
"Spark Streaming"部分的exactly once的shi實現: 使用WAL保證(注意我沒有提checkpoint和replication, 因為這兩個failover機制,並不是專門解決exactly once這個問題的)。
"輸出操作"對於exactly once的實現: 需要輸出結果保證冪等性, 這點官方文件已經說的比較清楚:
In order to achieve exactly-once semantics for output of your results,
其中, "輸入源"對於exactly once的實現可以專門再找專題講述, 不涉及對本文核心內容的理解; "輸出操作"對exactly once的實現很好理解,無非就是獨立出每次batch計算的結果並保證可重入, 所以本文重點介紹Spark Stream核心對Exactly Once的實現原理。
Spark Streaming計算框架對failover的實現
從網上能查到的千篇一律的Streaming計算邏輯,我這裡就不講了, 重點介紹一下計算框架對failover的處理部分。
計算框架使用3種方式來實現整體的failover機制:
1 checkpoint(注意,這個checkpoint和RDD的checkpoint是兩碼事): 在Driver層實現, 用於在Driver崩潰後,恢復Driver的現場;
2 replication: 在Receiver中用於解決單臺executor掛掉後,未儲存的資料丟失的問題。
3 WAL: 在Driver和Receiver中實現,用於解決:
(1) Driver掛掉,所有executor都會掛掉,那麼所有未儲存的資料都丟掉了,replication就不管用了;
(2) Driver掛掉後, 哪些block在掛掉前註冊到了driver中,以及掛掉前哪些block分配給了當時正在執行的batch job, 這些資訊就都丟失了;所以需要WAL對這些資訊做持久化。
其中, 第3個機制中的第2條,才是計算框架用於實現Exactly Once的努力, 下面針對這一點詳細展開。
WAL解決資料丟失問題的原理
Driver對輸入資料處理的步驟:
1 addBlock: 將輸入資料儲存轉化為block,並儲存到blockQueue中; //在ReceiverTrackerEndpoint執行緒處理;
2 allocateBlocksToBatch: 將當前所有未處理的block,分配給batch,然後刪除blockQueue中的所有block; //在JobGenerator的eventLoop執行緒中處理;
3 用分給本次batch的所有資料進行job計算; //在JobHandler執行緒中處理;
上面的前2個步驟,完全是在2個不同的執行緒裡非同步執行,而且會對資料的形式進行變換(buffer -> block -> batch data),
每一步的完成都會刪除前面一種資料形式的資料。所以,前2步,都會進行WAL操作,持久化資料。
簡單說,WAL中儲存的資料是這樣的:
A B C D E
addblock1 --> addblock2 --> 將所有block分給batch然後刪除所有block --> addblock3 --> addblock4....
這樣任何一個階段driver崩潰再恢復的時候,根據WAL,就可以恢復當時的資料;
舉例來說:
1 當執行到C時, 做了C的WAL儲存,然後開始執行對應的batch job, 這時Driver掛掉了; 再restart恢復時,發現WAL裡面
有A -> B -> C, 會一次重放執行, 也就是先將block1和block2放入blockQueue,然後將這2個block分配個batch job,然後刪除
掉blockQueue裡的這兩個block;這樣資料就恢復到了崩潰之前的現場;
2 當執行到E時,Driver崩潰, 那麼重新恢復後,一次執行A,B,C,D,E; 執行結果就是, block1和block2分配給了batch job
並執行,block3和block4存放在blockQueue中,等待下次到時分配給下一個batch job.
Spark Streaming計算框架無法解決的job執行一半的問題
但這裡仍無法解決一個問題,就是如果job執行一半(比如已經寫了一部分到結果資料庫中),driver崩潰,再恢復時,這個job會重新執行,但是上一次執行一半的job已經寫了部分到資料庫了。解決這個問題,就需要"輸出操作"是冪等的,這就不是Spark Streaming解決的問題了,需要應用程式自己來保證。
總結
輸入源對於Exactly Once需要實現的是: 崩潰前一部分資料已經作為block1輸入到Spark了, 那麼崩潰後恢復,這部分資料不能再輸入到spark中;
Spark Streaming計算框架對於Exactly Once需要實現的是: 接收輸入資料以及分配給Batch job資料,這兩部分的持久化一步都不能少,因為流入資料形成block和將block資料分給Batch,是分離的兩個步驟,沒有事務化;這其實也是本質上造成必須有Kafka Direct API的原因。比如只做了持久化block,但是沒有持久化哪些block應該分配給batch
job,那麼下次恢復時,哪些資料該屬於這個batch job,就完全不可知。最要命的是, 比如正在跑2個batch job,先跑的job1, 再跑的job2; 如果job2先跑完了,job1還在跑,這時崩潰了,那麼恢復後,如果沒有持久化分配給batch job的block資訊, 那麼會將所有資料都分配給一個batch job執行, 那麼相當於job2的資料被執行輸出了2次。
Spark Streaming框架,只能保證流入的資料不丟失,且崩潰前正在執行的batch job,崩潰後恢復時,分配給這個batch job的資料(無論從資料內容,還是資料大小),都要與崩潰前跑的這個batch job完全一致(具體就是使用WAL實現)。至於輸入源是否會重複傳送資料給Spark Streaming框架,Spark Streaming框架完全不可控。
總的來說, Spark Streaming整體架構實現Exactly Once, 是要依靠"輸入源", "計算框架自身", "輸出操作"共同協調完成的, 如果沒有Kafka direct API, 即使計算框架自身實現了WAL,也無法提供真正的Exactly Once語義(其實Kafka direct API已經可以自行實現Exactly Once語義而不需要拋掉WAL了, 不過實現方式其實是將輸入源與計算框架做了融合)。