圖解kafka - 設計原理解析
什麼是訊息佇列?
簡單來說,訊息佇列是存放訊息的容器。客戶端可以將訊息傳送到訊息伺服器,也可以從訊息伺服器獲取訊息。
問題導讀:
*********
- 為什麼需要訊息系統?
- kafka架構?
- kafka如何儲存訊息?
- Producer如何傳送訊息?
- Consumer如何消費訊息?
- Offset如何儲存?
- 如何保證訊息不被重複消費?
- 如何保證訊息的可靠性傳輸?
- 如何保證訊息的順序性?
為什麼需要訊息系統?
削峰
資料庫的處理能力是有限的,在峰值期,過多的請求落到後臺,一旦超過系統的處理能力,可能會使系統掛掉。
如上圖所是,系統的處理能力是2k/s,MQ處理能力是8k/s,峰值請求5k/s,MQ的處理能力遠遠大於資料庫,在高峰期,請求可以先積壓在MQ中,系統可以根據自身的處理能力以2k/s的速度消費這些請求。這樣等高峰期一過,請求可能只有100/s,系統可以很快的消費掉積壓在MQ中的請求。
注意,上面的請求指的是寫請求,查詢請求一般通過快取解決。
解耦
如下場景,S系統與A、B、C系統緊密耦合。由於需求變動,A系統修改了相關程式碼,S系統也需要調整A相關的程式碼;過幾天,C系統需要刪除,S緊跟著刪除C相關程式碼;又過了幾天,需要新增D系統,S系統又要新增與D相關的程式碼;再過幾天,程式猿瘋了...
這樣各個系統緊密耦合,不利於維護,也不利於擴充套件。現在引入MQ,A系統變動,A自己修改自己的程式碼即可;C系統刪除,直接取消訂閱;D系統新增,訂閱相關訊息即可。
這樣通過引入訊息中介軟體,使各個系統都與MQ互動,從而避免它們之間的錯綜複雜的呼叫關係。
Kafka架構
相關概念
1. broker kafka 叢集中包含的伺服器。 2. producer 訊息生產者。 3. consumer 訊息消費者 4. consumer group 每個 consumer 都屬於一個 consumer group,每條訊息只能被 consumer group 中的一個 consumer 消費,但可以被多個 consumer group 消費。 5. topic 訊息的類別。每條訊息都屬於某個topic,不同的topic之間是相互獨立的,即kafka是面向topic的。 6. partition 每個topic分為多個partition,partition是kafka分配的單位。kafka物理上的概念,相當於一個目錄,目錄下的日誌檔案構成這個partition。 7. replica partition的副本,保障 partition 的高可用。 8. leader replica 中的一個角色, producer 和 consumer 只跟 leader 互動。 9. follower replica 中的一個角色,從 leader 中複製資料。 10. controller kafka 叢集中的其中一個伺服器,用來進行 leader election 以及 各種 failover。 12. zookeeper kafka 通過 zookeeper 來儲存叢集的 meta 資訊。
Topic and Logs
Message是按照topic來組織的,每個topic可以分成多個partition(對應server.properties/num.partitions)。partition是一個順序的追加日誌,屬於順序寫磁碟(順序寫磁碟效率比隨機寫記憶體要高,保障 kafka 吞吐率)。其結構如下
server.properties/num.partitions 表示檔案 server.properties 中的 num.partitions 配置項,下同
partition中的每條記錄(message)包含三個屬性:offset, messageSize和data。其中offset表示訊息偏移量;messageSize表示訊息的大小;data表示訊息的具體內容。
partition是以檔案的形式儲存在檔案系統中,位置由server.properties/log.dirs指定,其命名規則為<topic_name>-<partition_id>。
比如,topic為"page_visits"的訊息,分為5個partition,其目錄結構為:
partition可能位於不同的broker上
partition是分段的,每個段是一個segment檔案。segment的常用配置有:
#server.properties
#segment檔案的大小,預設為 1G
log.segment.bytes=1024*1024*1024
#滾動生成新的segment檔案的最大時長
log.roll.hours=24*7
#segment檔案保留的最大時長,超時將被刪除
log.retention.hours=24*7
partition目錄下包括了資料檔案和索引檔案,下圖是某個partition的目錄結構:
index採用稀疏儲存的方式,它不會為每一條message都建立索引,而是每隔一定的位元組數建立一條索引,避免索引檔案佔用過多的空間。缺點是沒有建立索引的offset不能一次定位到message的位置,需要做一次順序掃描,但是掃描的範圍很小。
索引包含兩個部分(均為4個位元組的數字),分別為相對offset和position。相對offset表示segment檔案中的offset,position表示message在資料檔案中的位置。
總結:Kafka的Message儲存採用了分割槽(partition),磁碟順序讀寫,分段(LogSegment)和稀疏索引這幾個手段來達到高效性
Partition and Replica
一個topic物理上分為多個partition,位於不同的broker上。如果沒有 replica,一旦broker宕機,其上所有的patition將不可用。
每個partition可以有多個replica(對應server.properties/default.replication.factor),分配到不同的broker上,其中有一個leader負責讀寫,處理來自producer和consumer的請求;其它作為follower從leader pull訊息,保持與leader的同步。
如何分配partition和replica到broker上
- 將所有Broker(假設共n個Broker)和待分配的Partition排序
- 將第i個Partition分配到第(i mod n)個Broker上
- 將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
根據上面的分配規則,若replica的數量大於broker的數量,必定會有兩個相同的replica分配到同一個broker上,產生冗餘。因此replica的數量應該小於或等於broker的數量。
leader選舉
kafka 在 zookeeper 中(/brokers/topics/[topic]/partitions/[partition]/state)動態維護了一個 ISR(in-sync replicas),ISR 裡面的所有 replica 都"跟上"了 leader,controller將會從ISR裡選一個做leader。具體流程如下:
1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 節點註冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch
2. controller 從 /brokers/ids 節點讀取可用broker
3. controller決定set_p,該集合包含宕機 broker 上的所有 partition
4. 對 set_p 中的每一個 partition
4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR
4.2 決定新 leader
4.3 將新 leader、ISR、controller_epoch 和 leader_epoch 等資訊寫入 state 節點
5. 通過 RPC 向相關 broker 傳送 leaderAndISRRequest 命令
當ISR為空時,會選一個 replica(不一定是 ISR 成員)作為leader;當所有的 replica 都歇菜了,會等任意一個 replica 復活,將其作為leader。
ISR(同步列表)中的follower都"跟上"了leader,"跟上"並不表示完全一致,它由 server.properties/replica.lag.time.max.ms 配置,表示leader等待follower同步訊息的最大時間,如果超時,leader將follower移除ISR。
配置項 replica.lag.max.messages 已經移除
replica同步
kafka通過"拉模式"同步訊息,即follower從leader批量拉取資料來同步。具體的可靠性,是由生產者(根據配置項producer.properties/acks)來決定的。
In Kafka 0.9, request.required.acks=-1 which configration of producer is replaced by acks=all, but this old config is remained in docs.(在0.9版本,生產者配置項 request.required.acks=-1 被 acks=all 取代,但是老的配置項還保留在文件中。ps: 最新的文件2.2.x request.required.acks 已經不存在了)
acks | description |
---|---|
0 | producer傳送訊息後直接返回,不會等待伺服器確認 |
1 | 伺服器將記錄寫進本地log後返回,不會等待follower同步訊息。leader宕機後可能丟失一部分未同步的訊息 |
-1/all | 伺服器將記錄寫進本地log後,等待所有ISR內的訊息同步後返回。除非leader和所有的ISR都掛掉,否則訊息不會丟失 |
在acks=-1的時候,如果ISR少於min.insync.replicas指定的數目,將會丟擲NotEnoughReplicas或NotEnoughReplicasAfterAppend異常。
Prodecer如何傳送訊息
Producer首先將訊息封裝進一個ProducerRecord例項中。
訊息路由
- 傳送訊息時如果指定了partition,則直接使用;
- 如果指定了key,則對key進行雜湊,選出一個partition。這個hash(即分割槽機制)由producer.properties/partitioner.class指定的類實現,這個路由類需要實現Partitioner介面;
- 如果都未指定,通過round-robin來選partition。
訊息並不會立即傳送,而是先進行序列化後,傳送給Partitioner,也就是上面提到的hash函式,由Partitioner確定目標分割槽後,傳送到一塊記憶體緩衝區中(傳送佇列)。Producer的另一個工作執行緒(即Sender執行緒),則負責實時地從該緩衝區中提取出準備好的訊息封裝到一個批次內,統一發送到對應的broker中。其過程大致是這樣的:
圖片來自123archu
Consumer
每個Consumer都劃歸到一個邏輯Consumer Group中,一個partition只能被同一個Consumer Group中的一個Consumer消費,但可以被不同的Consumer Group消費。
若 topic 的 partition 數量為 p,Consumer Group 中訂閱此 topic 的 consumer 數量為 c; 則:
p < c: 會有 c - p 個 consumer閒置,造成浪費
p > c: 一個 consumer 對應多個 partition
p = c: 一個 consumer 對應一個 partition
應該合理分配consumer和partition的數量,避免造成資源傾斜,最好partiton數目是consumer數目的整數倍。
如何將Partition分配給Consumer
生產過程中broker要分配partition,消費過程這裡,也要分配partition給消費者。類似broker中選了一個controller出來,消費也要從broker中選一個coordinator,用於分配partition。
當 partition 或 consumer 數量發生變化時,比如 增加 consumer, 減少 consumer(主動或被動),增加 partition,都會進行 rebalance。
其過程如下:
consumer 給 coordinator 傳送 JoinGroupRequest 請求。這時其他consumer 發 heartbeat 請求過來時,coordinator 會告訴他們,要 rebalance了。其他 consumer 也傳送 JoinGroupRequest 請求。
coordinator在consumer中選出一個leader,其他作為 follower,通知給各個 consumer,對於leader,還會把 follower 的 metadata 帶給它。
consumer leader 根據 consumer metadata 重新分配 partition
consumer向coordinator傳送SyncGroupRequest,其中leader的SyncGroupRequest會包含分配的情況。coordinator回包,把分配的情況告訴consumer,包括leader。
Consumer Fetch Message
Consumer 採用"拉模式"消費訊息,這樣 consumer 可以自行決定消費的行為。
Consumer 呼叫 poll(duration) 從伺服器拉取訊息。拉取訊息的具體行為由下面的配置項決定:
#consumer.properties
#消費者最多 poll 多少個 record
max.poll.records=500
#消費者 poll 時 partition 返回的最大資料量
max.partition.fetch.bytes=1048576
#Consumer 最大 poll 間隔
#超過此值伺服器會認為此 consumer failed
#並將此 consumer 踢出對應的 consumer group
max.poll.interval.ms=300000
在 partition 中,每個訊息都有一個 offset。新訊息會被寫到 partition 末尾(最新的一個 segment 檔案末尾), 每個 partition 上的訊息是順序消費的,不同的 partition 之間訊息的消費順序是不確定的。
若一個 consumer 消費多個 partition, 則各個 partition 之前消費順序是不確定的,但在每個 partition 上是順序消費。
若來自不同 consumer group 的多個 consumer 消費同一個 partition,則各個 consumer 之間的消費互不影響,每個 Consumer 都會有自己的 offset。
Consumer A 和 Consumer B 屬於不同的 Consumer Group。Cosumer A 讀取到 offset = 9, Consumer B 讀取到 offset = 11,這個值表示下次讀取的位置。也就是說 Consumer A 已經讀取了 offset 為 0 ~ 8 的訊息,Consumer B 已經讀取了 offset 為 0 ~ 10 的訊息。
下次從 offset = 9 開始讀取的 Consumer 並不一定還是 Consumer A 因為可能發生 rebalance
offset的儲存
Consumer 消費 partition 時,需要儲存 offset 記錄當前消費位置。
offset 可以選擇自動提交或呼叫 Consumer 的 commitSync() 或 commitAsync() 手動提交,相關配置為:
#是否自動提交 offset
enable.auto.commit=true
#自動提交間隔。 enable.auto.commit=true 時有效
auto.commit.interval.ms=5000
offset 儲存在名叫 __consumeroffsets 的 topic 中。寫訊息的 key 由 groupid、topic、partition 組成,value 是 offset。
一般情況下,每個 key 的 offset 都是快取在記憶體中,查詢的時候不用遍歷partition,如果沒有快取,第一次就會遍歷 partition 建立快取,然後查詢返回。
__consumeroffsets 的 partition 數量由下面的 server 配置決定:
offsets.topic.num.partitions=50
offset 儲存在哪個分割槽上,即 __consumeroffsets 的分割槽機制,可以表示為:
groupId.hashCode() mode groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount 是上面配置的分割槽數。
因為一個 partition 只能被同一個 Consumer Group 的一個 consumer 消費,因此可以用 groupId 表示此 consumer 消費 offeset 所在分割槽
訊息系統可能遇到那些問題
kafka支援3種訊息投遞語義
- at most once:最多一次,訊息可能會丟失,但不會重複
獲取資料 -> commit offset -> 業務處理 - at least once:最少一次,訊息不會丟失,可能會重複
獲取資料 -> 業務處理 -> commit offset。 - exactly once:只且一次,訊息不丟失不重複,只且消費一次(0.11中實現,僅限於下游也是kafka)
如何保證訊息不被重複消費?(訊息的冪等性)
對於更新操作,天然具有冪等性。
對於新增操作,可以給每條訊息一個唯一的id,處理前判斷是否被處理過。這個id可以儲存在 Redis 中,如果是寫資料庫可以用主鍵約束。
如何保證訊息的可靠性傳輸?(訊息丟失的問題)
根據kafka架構,有三個地方可能丟失訊息:Consumer,Producer和 Server
消費端弄丟了資料
當 server.properties/enable.auto.commit 設定為 true 的時候,kafka 會先 commit offset 再處理訊息,如果這時候出現異常,這條訊息就丟失了。
因此可以關閉自動提交 offset,在處理完成後手動提交 offset,這樣可以保證訊息不丟失;但是如果提交 offset 失敗,可能導致重複消費的問題, 這時保證冪等性即可。
Kafka弄丟了訊息
如果某個 broker 不小心掛了,此時若 replica 只有一個,broker 上的訊息就丟失了;若 replica > 1 ,給 leader 重新選一個 follower 作為新的 leader, 如果 follower 還有些訊息沒有同步,這部分訊息便丟失了。
可以進行如下配置,避免上面的問題:
- 給 topic 設定 replication.factor 引數:這個值必須大於 1,要求每個 partition 必須有至少 2 個副本。
- 在 Kafka 服務端設定 min.insync.replicas 引數:這個值必須大於 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯絡,沒掉隊,這樣才能確保 leader 掛了還有一個 follower 吧。
- 在 producer 端設定 acks=all:這個是要求每條資料,必須是寫入所有 replica 之後,才能認為是寫成功了。
- 在 producer 端設定 retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裡了。
Producer弄丟了訊息
在 producer 端設定 acks=all,保證所有的ISR都同步了訊息才認為寫入成功。
如何保證訊息的順序性?
kafka 中 partition 上的訊息是順序的,可以將需要順序消費的訊息傳送到同一個 partition 上,用單個 consumer 消費。
上面是學習kafka時總結的,如有錯誤或不合理的地方,歡迎指正!
參考:
1: kafka學習筆記:知識點整理
2: advanced-java
3: Kafka的Log儲存解析
4: kafka生產者Producer引數設定及引數調優建議-商業環境實戰系列
5: 震驚了!原來這才是kafka!
6: kafka configuration
7: kafka 2.3.0 API
8: kafka consumer 配置詳解和提交方