kakafka - 為CQRS而生
前段時間跟一個朋友聊起kafka,flint,spark這些是不是某種分散式運算框架。我自認為的分散式運算框架最基礎條件是能夠把多個叢集節點當作一個完整的系統,然後程式好像是在同一臺機器的記憶體裡執行一樣。當然,這種整合實現方式有賴於底層的一套訊息系統。這套訊息系統可以把訊息隨意在叢集各節點之間自由傳遞。所以如果能夠通過訊息來驅動某段程式的執行,那麼這段程式就有可能在叢集中任何一個節點上運行了。好了,akka-cluster是通過對每個叢集節點上的中介傳送訊息使之調動該節點上某段程式執行來實現分散式運算的。那麼,kafka也可以實現訊息在叢集節點間的自由流通,是不是也是一個分散式運算框架呢?實際上,kafka設計強調的重點是訊息的接收,或者叫訊息消費機制。至於接收訊息後怎麼去應對,用什麼方式處理,都是kafka使用者自己的事了。與分散式運算框架像akka-cluster對比,kafka還缺了個在每個叢集節點上的”運算排程中介“,所以kafka應該不算我所指的分散式運算框架,充其量是一種分散式的訊息傳遞系統。實際上kafka是一種高吞吐量、高可用性、安全穩定、有良好口碑的分散式訊息系統。
kafka的本質是一種commit-log,或者“事件記錄系統”:上游產生的資料(即事件)會按發生時間順序存入kafka,然後下游可以對任何時間段內事件按序進行讀取,重演運算產生那段時間內的某種狀態。這不就是妥妥的CQRS模式嗎?當然kafka也可以使用在其它一些場景如:訊息佇列,資料儲存等,不過這些都是commit-log的具體應用。
常常看到網上有朋友抱怨akka-cluster的一些處理方式太底層或太基礎了。使用者往往需要自己來增加一些方法來確保使用安全。我想作為一種訊息驅動系統,如何保證akka訊息的正確產生和安全使用應該是最基本的要求。而恰恰akka是沒有提供對訊息遺漏和重複訊息的保障機制。我想這也是造成akka使用者擔心的主要原因。上面提到kafka是一種高吞吐量、高可用性、安全穩定的分散式訊息系統,特別是它提供了對exactly-once,“保證一次”的訊息使用支援。那麼通過kafka實現一套CQRS模式的實時交易處理系統應該是可行的。這也是我使用kafka的主要目的。
上面提到,希望能充分利用kafka commit-log特性來開發一個基於CQRS的實時交易系統,比如支付系統、庫存管理系統,從實踐中瞭解kafka。kafka支援多種語言終端,怪異的是沒有scala終端。kafka是用scala開發的,不提供scala終端實在是說不通啊。不過akka在alpakka社群提供了alpakka-kafka:這個東西是個基於akka-streams的kafka scala終端程式設計工具,稍微過了一下,感覺功能比較全面,那就是它了。不過在開始前先把kafka的原理和基本情況做個介紹:
從表面上看kafka就是一個簡單的訊息儲存和傳遞工具。不過因為其特殊分散式的訊息釋出、儲存、讀取處理機制,使其成為一種高吞吐量、高可用性、安全穩定的分散式訊息處理工具。從應用角度來講,kafka應用包括三個方面,kafka本身,就叫kafka引擎吧,釋出終端、訂閱終端,即:kafka,writer,reader三部分,其中:所有複雜的功能實現是包嵌在kafka內部的,writer,reader應該整合到使用者應用裡。kafka的作業是圍繞著訊息的釋出訂閱/讀寫進行的。所謂訊息即CQRS模式裡的事件。那麼kafka的工作原理直白點就是writer向kafka寫事件,kafka把事件按發生時間順序儲存,reader再按順序從kafka讀取事件並進行處理以產生新的業務狀態,如在某個庫位的一個商品數量得到了更新。當然原理看似簡單,但具體的實現才是真正複雜的地方。
首先,writer和reader是以事件關聯的,即:write釋出某種型別的事件,而reader則是訂閱相同型別的事件。 這裡的事件也就是topic,或一項業務,如:圖書類當前庫存。為了提高資料吞吐量,每個topic又可以細分為多個partition。每個partition分擔所屬topic訊息型別下的一些指定的細分類訊息或者事件,如"圖書庫房101"。如果把這些partition再分佈到一個叢集的節點上,就可以實現高吞吐量的分散式讀寫,然後通過叢集partition的複本同步又可以達到資料安全及系統高可用性的目的。這些叢集節點就是所謂的broker了。釋出訊息內容由topic,key,value所組成。其中key值指定該訊息應該寫入那個partition,即通過對key進行hash計算得出partition id。hash演算法可以保證相同的key值永遠指定同一個partition。值得注意的是kafka保證每個partition上的事件肯定按照發生時間排序,所以要保證一種事件只能寫入同一個partition。當然,一個partition可以承載多種事件。要注意的是建立topic和partition都是嚴格的管理工作admin,不是在某些程式中任意進行增減的。一般來講,在建立一個新topic時就要確定它下面的partition數量了。這個partition數量要按照對資料吞吐量需求設定。但一般是叢集節點的倍數,這樣partition可以均勻分佈在各broker上。
好了,該到reader這頭了:reader作業從訂閱某個topic開始。上面提過:一個topic下面可能有多個partition,但每個partition都會包含topic的其中幾個子業務的全部事件,而且這些事件是嚴格按發生時間排序的。kafka有個reader group這麼個概念:針對同一個topic,容許有一組多個reader對這個topic下的partition進行讀取。但每個partition只容許組內一個reader讀取。至於goup內reader是如何分配partition的完全由kafka內部解決。如果發現新partition或者組內reader有增減變化,kafka會自動進行再分配rebalance。所以總的來說訂閱某個topic的一個組內reader應該負責那個partition是不確定的,加上隨時可能發生動態再分配的情況,比如組內某個reader出問題倒了。換言之組內所有reader都必須具備處理整個topic所有型別業務的能力,如此才能解決組內reader-partition關係不確定的難題。kafka最重要的特點就是可以容許不同的應用通過不同的reader-group對同一個partition上的事件進行任意讀取,本意應該是不同的應用可以利用同一個業務事件序列進行不同的業務處理。具體實現方式應該是每個組對某個partition上事件最後讀取的位置分別進行了登記,offset-commit。這樣,即使發生了重新分配rebalance組內任何一個reader對分配到的partition應從那個位置開始讀還是確定的。這個offset-commit方式描述了幾種事件讀取模式:
1、at-most-once, 最多一次:如果剛讀取事件,在進行業務處理之前就登記位置commit-offset,那麼commit-offset後位置已經登記,即使業務處理失敗也再也不可能二次讀取了。
2、at-least-once,最少一次:讀取事件、完成業務處理後才commit-offset。如果處理業務中系統故障,只能從上次登記的位置重新讀取了,那麼就會出現重複讀取的情況。
3、exactly-once, 保證只一次:控制commit-offset的時間節點是取得at-most-once, at-least-once之間安全係數的一種方式。但exactly-once不容許有模糊地帶。具體做法是把業務處理和commit-offset作為一個完整事物單元來處理(atomic-transaction)。兩樣操作同時成功或失敗。
我覺著kafka的exactly-once能力最值得推介。因為在akka或者其它訊息佇列工具裡不容易得到保證。而在一個訊息驅動的實時交易系統裡,保證事件重演能正確反映當時狀態是關鍵。任何事件遺失或重複都會造成不可逆轉的誤差。那麼下面的一系列討論我就會嘗試用alpakka-kafka來構建一個基於CQRS模式的實時交易系統,並和大家進行交流分