1. 程式人生 > >Spark Streaming對Exactly Once的實現原理

Spark Streaming對Exactly Once的實現原理

昨天看到了這篇文章: 為什麼Spark Streaming + Kafka很難保證exactly once? 看過後,對作者對Exactly Once的理解不敢苟同,所以想寫這篇文章,闡述一下我對Spark Streaming保證Exactly Once語義的理解。

Exactly Once實現的整體性

首先一個很重要的道理是: 整個系統對exactly once的保證,從來都不是靠系統中某一部分來實現就能搞定的,需要整個流式系統一起努力才可以實現。

對Spark Streaming來說, Exactly once的實現,需要系統中三部分的整體保證: 

    輸入源 --> Spark Streaming計算 ---> 輸出操作

    "輸入源"對於exactly once的實現: Kafka的directly API其實就是在解決輸入源輸入資料的exactly once語義;

    "Spark Streaming"部分的exactly once的shi實現: 使用WAL保證(注意我沒有提checkpoint和replication, 因為這兩個failover機制,並不是專門解決exactly once這個問題的)。

    "輸出操作"對於exactly once的實現: 需要輸出結果保證冪等性, 這點官方文件已經說的比較清楚:

In order to achieve exactly-once semantics for output of your results,

 your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).
 其中, "輸入源"對於exactly once的實現可以專門再找專題講述, 不涉及對本文核心內容的理解; "輸出操作"對exactly once的實現很好理解,無非就是獨立出每次batch計算的結果並保證可重入, 所以本文重點介紹Spark Stream核心對Exactly Once的實現原理。

Spark Streaming計算框架對failover的實現

從網上能查到的千篇一律的Streaming計算邏輯,我這裡就不講了, 重點介紹一下計算框架對failover的處理部分。

計算框架使用3種方式來實現整體的failover機制:

    1 checkpoint(注意,這個checkpoint和RDD的checkpoint是兩碼事): 在Driver層實現, 用於在Driver崩潰後,恢復Driver的現場;

    2 replication: 在Receiver中用於解決單臺executor掛掉後,未儲存的資料丟失的問題。

    3 WAL: 在Driver和Receiver中實現,用於解決:

      (1) Driver掛掉,所有executor都會掛掉,那麼所有未儲存的資料都丟掉了,replication就不管用了;

      (2) Driver掛掉後,  哪些block在掛掉前註冊到了driver中,以及掛掉前哪些block分配給了當時正在執行的batch job, 這些資訊就都丟失了;所以需要WAL對這些資訊做持久化。

其中, 第3個機制中的第2條,才是計算框架用於實現Exactly Once的努力, 下面針對這一點詳細展開。

WAL解決資料丟失問題的原理

Driver對輸入資料處理的步驟:

    1 addBlock: 將輸入資料儲存轉化為block,並儲存到blockQueue中; //在ReceiverTrackerEndpoint執行緒處理;
    2 allocateBlocksToBatch: 將當前所有未處理的block,分配給batch,然後刪除blockQueue中的所有block;   //在JobGenerator的eventLoop執行緒中處理;

3 用分給本次batch的所有資料進行job計算; //在JobHandler執行緒中處理;
上面的前2個步驟,完全是在2個不同的執行緒裡非同步執行,而且會對資料的形式進行變換(buffer -> block -> batch data), 
每一步的完成都會刪除前面一種資料形式的資料。所以,前2步,都會進行WAL操作,持久化資料。
簡單說,WAL中儲存的資料是這樣的:
       A             B                    C                             D              E
   addblock1 --> addblock2 --> 將所有block分給batch然後刪除所有block --> addblock3 --> addblock4....
這樣任何一個階段driver崩潰再恢復的時候,根據WAL,就可以恢復當時的資料;
舉例來說:
    1 當執行到C時, 做了C的WAL儲存,然後開始執行對應的batch job, 這時Driver掛掉了; 再restart恢復時,發現WAL裡面
有A -> B -> C, 會一次重放執行, 也就是先將block1和block2放入blockQueue,然後將這2個block分配個batch job,然後刪除
掉blockQueue裡的這兩個block;這樣資料就恢復到了崩潰之前的現場;
    2 當執行到E時,Driver崩潰, 那麼重新恢復後,一次執行A,B,C,D,E; 執行結果就是, block1和block2分配給了batch job
並執行,block3和block4存放在blockQueue中,等待下次到時分配給下一個batch job.

Spark Streaming計算框架無法解決的job執行一半的問題

但這裡仍無法解決一個問題,就是如果job執行一半(比如已經寫了一部分到結果資料庫中),driver崩潰,再恢復時,這個job會重新執行,但是上一次執行一半的job已經寫了部分到資料庫了。
解決這個問題,就需要"輸出操作"是冪等的,這就不是Spark Streaming解決的問題了,需要應用程式自己來保證。 

總結

輸入源對於Exactly Once需要實現的是:  崩潰前一部分資料已經作為block1輸入到Spark了, 那麼崩潰後恢復,這部分資料不能再輸入到spark中;

Spark Streaming計算框架對於Exactly Once需要實現的是:  接收輸入資料以及分配給Batch job資料,這兩部分的持久化一步都不能少,因為流入資料形成block和將block資料分給Batch,是分離的兩個步驟,沒有事務化;這其實也是本質上造成必須有Kafka Direct API的原因。比如只做了持久化block,但是沒有持久化哪些block應該分配給batch job,那麼下次恢復時,哪些資料該屬於這個batch job,就完全不可知。最要命的是, 比如正在跑2個batch job,先跑的job1, 再跑的job2; 如果job2先跑完了,job1還在跑,這時崩潰了,那麼恢復後,如果沒有持久化分配給batch job的block資訊, 那麼會將所有資料都分配給一個batch job執行, 那麼相當於job2的資料被執行輸出了2次。
Spark Streaming框架,只能保證流入的資料不丟失,且崩潰前正在執行的batch job,崩潰後恢復時,分配給這個batch job的資料(無論從資料內容,還是資料大小),都要與崩潰前跑的這個batch job完全一致(具體就是使用WAL實現)。至於輸入源是否會重複傳送資料給Spark Streaming框架,Spark Streaming框架完全不可控。

總的來說, Spark Streaming整體架構實現Exactly Once, 是要依靠"輸入源", "計算框架自身", "輸出操作"共同協調完成的, 如果沒有Kafka direct API, 即使計算框架自身實現了WAL,也無法提供真正的Exactly Once語義(其實Kafka direct API已經可以自行實現Exactly Once語義而不需要拋掉WAL了, 不過實現方式其實是將輸入源與計算框架做了融合)。