1. 程式人生 > >Kafka高可用詳解

Kafka高可用詳解

Kafka的高可用

目錄

 

正文

回到頂部

一、高可用的由來

1.1 為何需要Replication

  在Kafka在0.8以前的版本中,是沒有Replication的,一旦某一個Broker宕機,則其上所有的Partition資料都不可被消費,這與Kafka資料永續性及Delivery Guarantee的設計目標相悖。同時Producer都不能再將資料存於這些Partition中。

  如果Producer使用同步模式則Producer會在嘗試重新發送message.send.max.retries(預設值為3)次後丟擲Exception,使用者可以選擇停止傳送後續資料也可選擇繼續選擇傳送。而前者會造成資料的阻塞,後者會造成本應發往該Broker的資料的丟失。

  如果Producer使用非同步模式,則Producer會嘗試重新發送message.send.max.retries(預設值為3)次後記錄該異常並繼續傳送後續資料,這會造成資料丟失並且使用者只能通過日誌發現該問題。同時,Kafka的Producer並未對非同步模式提供callback介面。

  由此可見,在沒有Replication的情況下,一旦某機器宕機或者某個Broker停止工作則會造成整個系統的可用性降低。隨著叢集規模的增加,整個叢集中出現該類異常的機率大大增加,因此對於生產系統而言Replication機制的引入非常重要。

1.2 Leader Election

  引入Replication之後,同一個Partition可能會有多個Replica,而這時需要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader互動,其它Replica作為Follower從Leader中複製資料。

  因為需要保證同一個Partition的多個Replica之間的資料一致性(其中一個宕機後其它Replica必須要能繼續服務並且即不能造成資料重複也不能造成資料丟失)。如果沒有一個Leader,所有Replica都可同時讀/寫資料,那就需要保證多個Replica之間互相(N×N條通路)同步資料,資料的一致性和有序性非常難保證,大大增加了Replication實現的複雜性,同時也增加了出現異常的機率。而引入Leader後,只有Leader負責資料讀寫,Follower只向Leader順序Fetch資料(N條通路),系統更加簡單且高效。

回到頂部

二、Kafka HA設計解析

2.1 如何將所有Replica均勻分佈到整個叢集

為了更好的做負載均衡,Kafka儘量將所有的Partition均勻分配到整個叢集上。一個典型的部署方式是一個Topic的Partition數量大於Broker的數量。同時為了提高Kafka的容錯能力,也需要將同一個Partition的Replica儘量分散到不同的機器。實際上,如果所有的Replica都在同一個Broker上,那一旦該Broker宕機,該Partition的所有Replica都無法工作,也就達不到HA的效果。同時,如果某個Broker宕機了,需要保證它上面的負載可以被均勻的分配到其它倖存的所有Broker上。

Kafka分配Replica的演算法如下:

1.將所有Broker(假設共n個Broker)和待分配的Partition排序

2.將第i個Partition分配到第(i mod n)個Broker上

3.將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上

2.2 Data Replication(副本策略)

Kafka的高可靠性的保障來源於其健壯的副本(replication)策略。

2.2.1 訊息傳遞同步策略

Producer在釋出訊息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然後無論該Topic的Replication Factor為多少,Producer只將該訊息傳送到該Partition的Leader。Leader會將該訊息寫入其本地Log。每個Follower都從Leader pull資料。這種方式上,Follower儲存的資料順序與Leader保持一致。Follower在收到該訊息並寫入其Log後,向Leader傳送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該訊息就被認為已經commit了,Leader將增加HW並且向Producer傳送ACK。

為了提高效能,每個Follower在接收到資料後就立馬向Leader傳送ACK,而非等到資料寫入Log中。因此,對於已經commit的訊息,Kafka只能保證它被存於多個Replica的記憶體中,而不能保證它們被持久化到磁碟中,也就不能完全保證異常發生後該條訊息一定能被Consumer消費。

Consumer讀訊息也是從Leader讀取,只有被commit過的訊息才會暴露給Consumer。

Kafka Replication的資料流如下圖所示:

2.2.2 ACK前需要保證有多少個備份

對於Kafka而言,定義一個Broker是否“活著”包含兩個條件:

  • 一是它必須維護與ZooKeeper的session(這個通過ZooKeeper的Heartbeat機制來實現)。
  • 二是Follower必須能夠及時將Leader的訊息複製過來,不能“落後太多”。

Leader會跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)。如果一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裡所描述的“落後太多”指Follower複製的訊息落後於Leader後的條數超過預定值(該值可在$KAFKA_HOME/config/server.properties中通過replica.lag.max.messages配置,其預設值是4000)或者Follower超過一定時間(該值可在$KAFKA_HOME/config/server.properties中通過replica.lag.time.max.ms來配置,其預設值是10000)未向Leader傳送fetch請求。

Kafka的複製機制既不是完全的同步複製,也不是單純的非同步複製。事實上,完全同步複製要求所有能工作的Follower都複製完,這條訊息才會被認為commit,這種複製方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而非同步複製方式下,Follower非同步的從Leader複製資料,資料只要被Leader寫入log就被認為已經commit,這種情況下如果Follower都複製完都落後於Leader,而如果Leader突然宕機,則會丟失資料。而Kafka的這種使用ISR的方式則很好的均衡了確保資料不丟失以及吞吐率。Follower可以批量的從Leader複製資料,這樣極大的提高複製效能(批量寫磁碟),極大減少了Follower與Leader的差距。

需要說明的是,Kafka只解決fail/recover,不處理“Byzantine”(“拜占庭”)問題。一條訊息只有被ISR裡的所有Follower都從Leader複製過去才會被認為已提交。這樣就避免了部分資料被寫進了Leader,還沒來得及被任何Follower複製就宕機了,而造成資料丟失(Consumer無法消費這些資料)。而對於Producer而言,它可以選擇是否等待訊息commit,這可以通過request.required.acks來設定。這種機制確保了只要ISR有一個或以上的Follower,一條被commit的訊息就不會丟失。

2.2.3 Leader Election演算法

Leader選舉本質上是一個分散式鎖,有兩種方式實現基於ZooKeeper的分散式鎖:

  • 節點名稱唯一性:多個客戶端建立一個節點,只有成功建立節點的客戶端才能獲得鎖
  • 臨時順序節點:所有客戶端在某個目錄下建立自己的臨時順序節點,只有序號最小的才獲得鎖

一種非常常用的選舉leader的方式是“Majority Vote”(“少數服從多數”),但Kafka並未採用這種方式。這種模式下,如果我們有2f+1個Replica(包含Leader和Follower),那在commit之前必須保證有f+1個Replica複製完訊息,為了保證正確選出新的Leader,fail的Replica不能超過f個。因為在剩下的任意f+1個Replica裡,至少有一個Replica包含有最新的所有訊息。這種方式有個很大的優勢,系統的latency只取決於最快的幾個Broker,而非最慢那個。Majority Vote也有一些劣勢,為了保證Leader Election的正常進行,它所能容忍的fail的follower個數比較少。如果要容忍1個follower掛掉,必須要有3個以上的Replica,如果要容忍2個Follower掛掉,必須要有5個以上的Replica。也就是說,在生產環境下為了保證較高的容錯程度,必須要有大量的Replica,而大量的Replica又會在大資料量下導致效能的急劇下降。這就是這種演算法更多用在ZooKeeper這種共享叢集配置的系統中而很少在需要儲存大量資料的系統中使用的原因。例如HDFS的HA Feature是基於majority-vote-based journal,但是它的資料儲存並沒有使用這種方式。

Kafka在ZooKeeper中動態維護了一個ISR(in-sync replicas),這個ISR裡的所有Replica都跟上了leader,只有ISR裡的成員才有被選為Leader的可能。在這種模式下,對於f+1個Replica,一個Partition能在保證不丟失已經commit的訊息的前提下容忍f個Replica的失敗。在大多數使用場景中,這種模式是非常有利的。事實上,為了容忍f個Replica的失敗,Majority Vote和ISR在commit前需要等待的Replica數量是一樣的,但是ISR需要的總的Replica的個數幾乎是Majority Vote的一半。

雖然Majority Vote與ISR相比有不需等待最慢的Broker這一優勢,但是Kafka作者認為Kafka可以通過Producer選擇是否被commit阻塞來改善這一問題,並且節省下來的Replica和磁碟使得ISR模式仍然值得。

2.2.4 如何處理所有Replica都不工作

在ISR中至少有一個follower時,Kafka可以確保已經commit的資料不丟失,但如果某個Partition的所有Replica都宕機了,就無法保證資料不丟失了。這種情況下有兩種可行的方案:

1.等待ISR中的任一個Replica“活”過來,並且選它作為Leader

2.選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader

這就需要在可用性和一致性當中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者資料都丟失了,這個Partition將永遠不可用。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它並不保證已經包含了所有已commit的訊息,它也會成為Leader而作為consumer的資料來源(前文有說明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文件,在以後的版本中,Kafka支援使用者通過配置選擇這兩種方式中的一種,從而根據不同的使用場景選擇高可用性還是強一致性。

2.2.5 選舉Leader

最簡單最直觀的方案是,所有Follower都在ZooKeeper上設定一個Watch,一旦Leader宕機,其對應的ephemeral znode會自動刪除,此時所有Follower都嘗試建立該節點,而建立成功者(ZooKeeper保證只有一個能建立成功)即是新的Leader,其它Replica即為Follower。

但是該方法會有3個問題:

1.split-brain 這是由ZooKeeper的特性引起的,雖然ZooKeeper能保證所有Watch按順序觸發,但並不能保證同一時刻所有Replica“看”到的狀態是一樣的,這就可能造成不同Replica的響應不一致

2.herd effect 如果宕機的那個Broker上的Partition比較多,會造成多個Watch被觸發,造成叢集內大量的調整

3.ZooKeeper負載過重 每個Replica都要為此在ZooKeeper上註冊一個Watch,當叢集規模增加到幾千個Partition時ZooKeeper負載會過重。

Kafka 0.8.*的Leader Election方案解決了上述問題,它在所有broker中選出一個controller,所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的方式(比ZooKeeper Queue的方式更高效)通知需為為此作為響應的Broker。同時controller也負責增刪Topic以及Replica的重新分配。

回到頂部

三、HA相關ZooKeeper結構

3.1 admin

該目錄下znode只有在有相關操作時才會存在,操作結束時會將其刪除

/admin/reassign_partitions用於將一些Partition分配到不同的broker集合上。對於每個待重新分配的Partition,Kafka會在該znode上儲存其所有的Replica和相應的Broker id。該znode由管理程序建立並且一旦重新分配成功它將會被自動移除。

3.2 broker

即/brokers/ids/[brokerId])儲存“活著”的broker資訊。

topic註冊資訊(/brokers/topics/[topic]),儲存該topic的所有partition的所有replica所在的broker id,第一個replica即為preferred replica,對一個給定的partition,它在同一個broker上最多隻有一個replica,因此broker id可作為replica id。

3.3 controller

/controller -> int (broker id of the controller)儲存當前controller的資訊

/controller_epoch -> int (epoch)直接以整數形式儲存controller epoch,而非像其它znode一樣以JSON字串形式儲存。

回到頂部

四、producer釋出訊息

4.1 寫入方式

producer 採用 push 模式將訊息釋出到 broker,每條訊息都被 append 到 patition 中,屬於順序寫磁碟(順序寫磁碟效率比隨機寫記憶體要高,保障 kafka 吞吐率)。

4.2 訊息路由

producer 傳送訊息到 broker 時,會根據分割槽演算法選擇將其儲存到哪一個 partition。其路由機制為:

1、 指定了 patition,則直接使用;
2、 未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition
3、 patition 和 key 都未指定,使用輪詢選出一個 patition。

4.3 寫入流程

producer 寫入訊息序列圖如下所示:

流程說明:

1、 producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader 
2、 producer 將訊息傳送給該 leader 
3、 leader 將訊息寫入本地 log 
4、 followers 從 leader pull 訊息,寫入本地 log 後 leader 傳送 ACK 
5、 leader 收到所有 ISR 中的 replica 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset) 並向 producer 傳送 ACK

回到頂部

五、broker儲存訊息

5.1 儲存方式

物理上把 topic 分成一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每個 patition 物理上對應一個資料夾(該資料夾儲存該 patition 的所有訊息和索引檔案),如下:

5.2 儲存策略

無論訊息是否被消費,kafka 都會保留所有訊息。有兩種策略可以刪除舊資料:

1、 基於時間:log.retention.hours=168 
2、 基於大小:log.retention.bytes=1073741824

回到頂部

六、Topic的建立和刪除

6.1 建立topic

建立 topic 的序列圖如下所示:

流程說明:

1、 controller 在 ZooKeeper 的 /brokers/topics 節點上註冊 watcher,當 topic 被建立,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。
2、 controller從 /brokers/ids 讀取當前所有可用的 broker 列表,對於 set_p 中的每一個 partition:
     2.1、 從分配給該 partition 的所有 replica(稱為AR)中任選一個可用的 broker 作為新的 leader,並將AR設定為新的 ISR 
     2.2、 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state 
3、 controller 通過 RPC 向相關的 broker 傳送 LeaderAndISRRequest。

6.2 刪除topic

刪除 topic 的序列圖如下所示:

流程說明:

1、 controller 在 zooKeeper 的 /brokers/topics 節點上註冊 watcher,當 topic 被刪除,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。 
2、 若 delete.topic.enable=false,結束;否則 controller 註冊在 /admin/delete_topics 上的 watch 被 fire,controller 通過回撥向對應的 broker 傳送 StopReplicaRequest。

回到頂部

七、broker failover

kafka broker failover 序列圖如下所示:

流程說明:

1、 controller 在 zookeeper 的 /brokers/ids/[brokerId] 節點註冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch
2、 controller 從 /brokers/ids 節點讀取可用broker 
3、 controller決定set_p,該集合包含宕機 broker 上的所有 partition 
4、 對 set_p 中的每一個 partition 
    4.1、 從/brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR 
    4.2、 決定新 leader 
    4.3、 將新 leader、ISR、controller_epoch 和 leader_epoch 等資訊寫入 state 節點
5、 通過 RPC 向相關 broker 傳送 leaderAndISRRequest 命令

回到頂部

八、controller failover

當 controller 宕機時會觸發 controller failover。每個 broker 都會在 zookeeper 的 "/controller" 節點註冊 watcher,當 controller 宕機時 zookeeper 中的臨時節點消失,所有存活的 broker 收到 fire 的通知,每個 broker 都嘗試建立新的 controller path,只有一個競選成功並當選為 controller。

當新的 controller 當選時,會觸發 KafkaController.onControllerFailover 方法,在該方法中完成如下操作:

1、 讀取並增加 Controller Epoch。 
2、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上註冊 watcher。 
3、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上註冊 watcher。 
4、 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上註冊 watcher。 
5、 若 delete.topic.enable=true(預設值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上註冊 watcher。 
6、 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上註冊Watch。 
7、 初始化 ControllerContext 物件,設定當前所有 topic,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等。 
8、 啟動 replicaStateMachine 和 partitionStateMachine。 
9、 將 brokerState 狀態設定為 RunningAsController。 
10、 將每個 partition 的 Leadership 資訊傳送給所有“活”著的 broker。 
11、 若 auto.leader.rebalance.enable=true(預設值是true),則啟動 partition-rebalance 執行緒。 
12、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。