1. 程式人生 > 其它 >Kafka 分割槽討論

Kafka 分割槽討論

目錄

2.4.1 副本機制

同步節點定義:

宕機如何恢復

2.4.2 Leader選舉

如何選舉?

總結:

2.4.3 分割槽重新分配

2.4.4 自動再均衡

2.4.5 修改分割槽副本

2.4.6 分割槽分配策略

2.4.6.1 RangeAssignor

2.4.6.2 RoundRobinAssignor

2.4.6.3 StickyAssignor

2.4.6.4 自定義分配策略(瞭解)


 

4.2.5 Kafka 高階特性-分割槽

2.4.1 副本機制

Kafka在一定數量的伺服器上對主題分割槽進行復制。
當叢集中的一個broker宕機後系統可以自動故障轉移到其他可用的副本上,不會造成資料丟失。
 

1. 將複製因子為1的未複製主題稱為複製主題。
2. 主題的分割槽是複製的最小單元。
3. 在非故障情況下,Kafka中的每個分割槽都有一個Leader副本和零個或多個Follower副本。
4. 包括Leader副本在內的副本總數構成複製因子。 --replication-factor 3    => 1leader+2follower
5. 所有讀取和寫入都由Leader副本負責。
6. 通常,分割槽比broker多,並且Leader分割槽在broker之間平均分配。

Follower分割槽像普通的Kafka消費者一樣,消費來自Leader分割槽的訊息,並將其持久化到自己的日誌中。

允許Follower對日誌條目拉取進行批處理。

 

同步節點定義:

1. 節點必須能夠維持與ZooKeeper的會話(通過ZooKeeper的心跳機制)
2. 對於Follower副本分割槽,它複製在Leader分割槽上的寫入,並且不要延遲太多

Kafka提供的保證是,只要有至少一個同步副本處於活動狀態,提交的訊息就不會丟失。

 

宕機如何恢復

(1)少部分副本宕機
     leader宕機,會從follower選擇一個作為leader。當宕機的重新恢復時,會把之前commit的資料清空,重新從leader裡pull資料。
(2)全部副本宕機
    當全部副本宕機了有兩種恢復方式
     1、等待ISR中的一個恢復後,並選它作為leader。(等待時間較長,降低可用性)
     2、選擇第一個恢復的副本作為新的leader,無論是否在ISR中。(並未包含之前leader commit的資料,因此造成資料丟失)

 

2.4.2 Leader選舉

下圖中
分割槽P1的Leader是0,ISR是0和1
分割槽P2的Leader是2,ISR是1和2
分割槽P3的Leader是1,ISR是0,1,2。

生產者和消費者的請求都由Leader副本來處理。Follower副本只負責消費Leader副本的資料和Leader保持同步。
對於P1,如果0宕機會發生什麼?
Leader副本和Follower副本之間的關係並不是固定不變的,在Leader所在的broker發生故障的時候,就需要進行分割槽的Leader副本和Follower副本之間的切換,需要選舉Leader副本。

 

如何選舉?

如果某個分割槽所在的伺服器除了問題,不可用,kafka會從該分割槽的其他的副本中選擇一個作為新的Leader。之後所有的讀寫就會轉移到這個新的Leader上。現在的問題是應當選擇哪個作為新的Leader。
只有那些跟Leader保持同步的Follower才應該被選作新的Leader。
Kafka會在Zookeeper上針對每個Topic維護一個稱為ISR(in-sync replica,已同步的副本)的集合,該集合中是一些分割槽的副本。
只有當這些副本都跟Leader中的副本同步了之後,kafka才會認為訊息已提交,並反饋給訊息的生產者。
如果這個集合有增減,kafka會更新zookeeper上的記錄。

 

如果某個分割槽的Leader不可用,Kafka就會從ISR集合中選擇一個副本作為新的Leader。
顯然通過ISR,kafka需要的冗餘度較低,可以容忍的失敗數比較高。
假設某個topic有N+1個副本,kafka可以容忍N個伺服器不可用。

 

為什麼不用少數服從多數的方法
少數服從多數是一種比較常見的一致性算髮和Leader選舉法。
它的含義是隻有超過半數的副本同步了,系統才會認為資料已同步;
選擇Leader時也是從超過半數的同步的副本中選擇。
這種演算法需要較高的冗餘度,跟Kafka比起來,浪費資源。
譬如只允許一臺機器失敗,需要有三個副本;而如果只容忍兩臺機器失敗,則需要五個副本。
而kafka的ISR集合方法,分別只需要兩個和三個副本。

 

如果所有的ISR副本都失敗了怎麼辦?
此時有兩種方法可選,
  1. 等待ISR集合中的副本復活,
  2. 選擇任何一個立即可用的副本,而這個副本不一定是在ISR集合中。

  • 需要設定 unclean.leader.election.enable=true

這兩種方法各有利弊,實際生產中按需選擇。 如果要等待ISR副本復活,雖然可以保證一致性,但可能需要很長時間。而如果選擇立即可用的副本,則很可能該副本並不一致。

 

總結:

Kafka中Leader分割槽選舉,通過維護一個動態變化的ISR集合來實現,一旦Leader分割槽丟掉,則從ISR中隨機挑選一個副本做新的Leader分割槽。
如果ISR中的副本都丟失了,則:
    1. 可以等待ISR中的副本任何一個恢復,接著對外提供服務,需要時間等待。
    2. 從OSR中選出一個副本做Leader副本,此時會造成資料丟失

 


2.4.3 分割槽重新分配

向已經部署好的Kafka叢集裡面新增機器,我們需要從已經部署好的Kafka節點中複製相應的配置檔案,然後把裡面的broker id修改成全域性唯一的,最後啟動這個節點即可將它加入到現有Kafka叢集中。
問題:新新增的Kafka節點並不會自動地分配資料,無法分擔叢集的負載,除非我們新建一個topic。

需要手動將部分分割槽移到新新增的Kafka節點上,Kafka內部提供了相關的工具來重新分佈某個topic的分割槽。

在重新分佈topic分割槽之前,我們先來看看現在topic的各個分割槽的分佈位置:
1. 建立主題:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_re_01 --partitions 5 --replication-factor 1

2. 檢視主題資訊:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_01

3. 在node11搭建Kafka:
拷貝JDK並安裝到新結點上去

[root@node1 opt]# scp jdk-8u261-linux-x64.rpm node11:~ 

[root@node11 opt]# rpm -ivh jdk-8u261-linux-x64.rpm

拷貝node1上安裝的Kafka

[root@node1 opt]# scp -r kafka_2.12-1.0.2/ node11:/opt 

配置node11 上的環境變數,  此處不需要zookeeper,切記!!!

讓配置生效:

. /etc/profile 

修改node11上Kafka的配置:

啟動Kafka:

[root@node11 ~]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties

注意觀察node11上節點啟動的時候的ClusterId,看和zookeeper節點上的ClusterId是否一致,如果是,證明node11和node1在同一個叢集中。 node11啟動的Cluster ID:

 

zookeeper節點上的Cluster ID:

 

在node1上檢視zookeeper的節點資訊:

 

node11的節點已經加入叢集了。

 

4. 現在我們在現有叢集的基礎上再新增一個Kafka節點,然後使用Kafka自帶的 kafka-reassign-partitions.sh 工具來重新分佈分割槽。該工具有三種使用模式:

  • 1、generate模式,給定需要重新分配的Topic,自動生成reassign plan(並不執行)
  • 2、execute模式,根據指定的reassign plan重新分配Partition
  • 3、verify模式,驗證重新分配Partition是否成功

5. 我們將分割槽3和4重新分佈到broker1上,藉助kafka-reassign-partitions.sh工具生成reassign plan,不過我們先得按照要求定義一個檔案,裡面說明哪些topic需要重新分割槽,檔案內容如下:

[root@node1 ~]# cat topics-to-move.json

  1. {
  2.     "topics": [
  3.         {
  4.            "topic":"tp_re_01"
  5.        }
  6.     ],
  7.    "version":1
  8. }

 

然後使用 kafka-reassign-partitions.sh 工具生成reassign plan

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --topics-to-move-json-file topics-to-move.json --broker-list "0,1" --generate


Current partition replica assignment
{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":
[0],"log_dirs":["any"]}]}


Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":
[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":
[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":
[0],"log_dirs":["any"]}]}

 

 

 

Proposed partition reassignment configuration下面生成的就是將分割槽重新分佈到broker 1上的結果。我們將這些內容儲存到名為result.json檔案裡面(檔名不重要,檔案格式也不一定要以json為結尾,只要保證內容是json即可),然後執行這些reassign plan:

 

執行計劃:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file topics-to-execute.json --execute


Current partition replica assignment
{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":
[0],"log_dirs":["any"]}]}


Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.


[root@node1 ~]#

 

這樣Kafka就在執行reassign plan,我們可以校驗reassign plan是否執行完成:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file topics-to-execute.json --verify


Status of partition reassignment:
Reassignment of partition tp_re_01-1 completed successfully
Reassignment of partition tp_re_01-4 completed successfully
Reassignment of partition tp_re_01-2 completed successfully
Reassignment of partition tp_re_01-3 completed successfully
Reassignment of partition tp_re_01-0 completed successfully


[root@node1 ~]#

 

檢視主題的細節:

 

分割槽的分佈的確和操作之前不一樣了,broker 1上已經有分割槽分佈上去了。使用 kafka-reassign-partitions.sh 工具生成的reassign plan只是一個建議,方便大家而已。其實我們自己完全可以編輯一個reassign plan,然後執行它,如下:

  1. {
  2.   "version": 1,
  3.   "partitions": [{
  4.     "topic": "tp_re_01",
  5.     "partition": 4,
  6.     "replicas": [1],
  7.     "log_dirs": ["any"]
  8.  }, {
  9.     "topic": "tp_re_01",
  10.     "partition": 1,
  11.     "replicas": [0],
  12.     "log_dirs": ["any"]
  13.  }, {
  14.     "topic": "tp_re_01",
  15.     "partition": 2,
  16.     "replicas": [0],
  17.     "log_dirs": ["any"]
  18.  }, {
  19.     "topic": "tp_re_01",
  20.     "partition": 3,
  21.     "replicas": [1],
  22.     "log_dirs": ["any"]
  23.  }, {
  24.     "topic": "tp_re_01",
  25.     "partition": 0,
  26.     "replicas": [0],
  27.     "log_dirs": ["any"]
  28.  }]
  29. }

 

將上面的json資料檔案儲存到my-topics-to-execute.json檔案中,然後也是執行它:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file my-topics-to-execute.json --execute


Current partition replica assignment
{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":
[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":
[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":
[0],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

 

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file my-topics-to-execute.json --verify
Status of partition reassignment:
Reassignment of partition tp_re_01-1 completed successfully
Reassignment of partition tp_re_01-4 completed successfully
Reassignment of partition tp_re_01-2 completed successfully
Reassignment of partition tp_re_01-3 completed successfully
Reassignment of partition tp_re_01-0 completed successfully

 

等這個reassign plan執行完,我們再來看看分割槽的分佈:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_01


Topic:tp_re_01 PartitionCount:5 ReplicationFactor:1 Configs:
        Topic: tp_re_01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
        Topic: tp_re_01 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
        Topic: tp_re_01 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
        Topic: tp_re_01 Partition: 3 Leader: 1 Replicas: 1 Isr: 1
        Topic: tp_re_01 Partition: 4 Leader: 1 Replicas: 1 Isr: 1

[root@node1 ~]#

搞定!

 

 

2.4.4 自動再均衡

我們可以在新建主題的時候,手動指定主題各個Leader分割槽以及Follower分割槽的分配情況,即什麼分割槽副本在哪個broker節點上。

隨著系統的執行,broker的宕機重啟,會引發Leader分割槽和Follower分割槽的角色轉換,最後可能Leader大部分都集中在少數幾臺broker上,由於Leader負責客戶端的讀寫操作,此時集中Leader分割槽的少數幾臺伺服器的網路I/O,CPU,以及記憶體都會很緊張。

Leader和Follower的角色轉換會引起Leader副本在叢集中分佈的不均衡,此時我們需要一種手段,讓Leader的分佈重新恢復到一個均衡的狀態。

執行指令碼:

[root@node11 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_03 --replica-assignment "0:1,1:0,0:1"

 

上述指令碼執行的結果是:建立了主題tp_demo_03,有三個分割槽,每個分割槽兩個副本,Leader副本在列表中第一個指定的brokerId上,Follower副本在隨後指定的brokerId上。

 

然後模擬broker0宕機的情況:

# 通過jps找到Kafka程序PID
[root@node1 ~]# jps
54912 Jps
1699 QuorumPeerMain
1965 Kafka

 

# 直接殺死程序
[root@node1 ~]# kill -9 1965
[root@node1 ~]# jps
1699 QuorumPeerMain
54936 Jps
[root@node1 ~]#

 

# 檢視主題分割槽資訊:
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_demo_03

 

# 重新啟動node1上的Kafka
[root@node1 ~]# kafka-server-start.sh -daemon /opt/kafka_2.12-1.0.2/config/server.properties
[root@node1 ~]# jps
1699 QuorumPeerMain
55525 Kafka
55557 Jps
[root@node1 ~]#

 

# 檢視主題的分割槽資訊:
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_demo_03

# broker恢復了,但是Leader的分配並沒有變化,還是處於Leader切換後的分配情況。

 

是否有一種方式,可以讓Kafka自動幫我們進行修改?改為初始的副本分配?
此時,用到了Kafka提供的自動再均衡指令碼: kafka-preferred-replica-election.sh
先看介紹:

 

該工具會讓每個分割槽的Leader副本分配在合適的位置,讓Leader分割槽和Follower分割槽在伺服器之間均衡分配。

如果該指令碼僅指定zookeeper地址,則會對叢集中所有的主題進行操作,自動再平衡, 如下:

 

否則,  想要自定義, 具體操作如下:

1, 殺死node11 上的kafka-server, 檢視分割槽

 

2, 然後重新啟動, 檢視分割槽, Isr恢復, 但是Leader未改變

 

3, 希望分割槽1 恢復到最初設定, 則自定義json檔案:

  1. {
  2. "partitions": [
  3. {
  4. "topic":"tp_demo_03",
  5. "partition":1
  6. }
  7. ]
  8. }

 

4,  執行操作:

[root@node1 ~]# kafka-preferred-replica-election.sh --zookeeper node1:2181/myKafka --path-to-json-file preferred-replicas.json

 

5, 描述分割槽, 檢視結果

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_demo_03

恢復到最初的分配情況。

 

之所以是這樣的分配,是因為我們在建立主題的時候:

--replica-assignment "0:1,1:0,0:1" 

在逗號分割的每個數值對中排在前面的是Leader分割槽,後面的是副本分割槽。那麼所謂的preferred replica,就是排在前面的數字就是Leader副本應該在的brokerId。

 

 

2.4.5 修改分割槽副本

實際專案中,我們可能由於叢集的擴充套件,需要重新設定副本因子

topic一旦使用又不能輕易刪除重建,因此動態增加副本因子就成為最終的選擇。

 

說明:kafka 1.0版本配置檔案預設沒有default.replication.factor=x, 因此如果建立topic時,不指定–replication-factor, 預設1. 我們可以在自己的server.properties中配置上常用的副本因子,省去手動調整。例如設定default.replication.factor=3, 詳細內容可參考官方文件https://kafka.apache.org/documentation/#replication

原因分析:
假設我們有2個kafka broker分別broker0,broker1。

 

1. 當我們建立的topic有2個分割槽partition時並且replication-factor為1,基本上一個broker上一個分割槽。當一個broker宕機了,該topic就無法使用了,因為兩個個分割槽只有一個能用。
2. 當我們建立的topic有3個分割槽partition時並且replication-factor為2時,可能分割槽資料分佈情況是 broker0, partiton0,partiton1,partiton2, broker1, partiton1,partiton0,partiton2,

每個分割槽有一個副本,當其中一個broker宕機了,kafka叢集還能完整湊出該topic的兩個分割槽,例如當broker0宕機了,可以通過broker1組合出topic的兩個分割槽。

1. 建立主題:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_re_02 --partitions 3 --replication-factor 1

 

2. 檢視主題細節:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_02

 

3. 修改副本因子:錯誤方式

 

4. 使用 kafka-reassign-partitions.sh 修改副本因子:
-. 建立increment-replication-factor.json

  1. {
  2. "version":1,
  3. "partitions":[
  4. {"topic":"tp_re_02","partition":0,"replicas":[0,1]},
  5. {"topic":"tp_re_02","partition":1,"replicas":[0,1]},
  6. {"topic":"tp_re_02","partition":2,"replicas":[1,0]}
  7. ]
  8. }

 

5. 執行分配:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file increase-replication-factor.json --execute

 

6. 檢視主題細節:

7. 搞定!

 

 

2.4.6 分割槽分配策略

在Kafka中,每個Topic會包含多個分割槽,預設情況下一個分割槽只能被一個消費組下面的一個消費者消費,這裡就產生了分割槽分配的問題。Kafka中提供了多重分割槽分配演算法(PartitionAssignor)的實現:RangeAssignor、RoundRobinAssignor、StickyAssignor。

 

2.4.6.1 RangeAssignor

PartitionAssignor介面用於使用者定義實現分割槽分配演算法,以實現Consumer之間的分割槽分配。

消費組的成員訂閱它們感興趣的Topic並將這種訂閱關係傳遞給作為訂閱組協調者的Broker。協調者選擇其中的一個消費者來執行這個消費組的分割槽分配並將分配結果轉發給消費組內所有的消費者。Kafka預設採用RangeAssignor的分配演算法。

RangeAssignor對每個Topic進行獨立的分割槽分配。對於每一個Topic,首先對分割槽按照分割槽ID進行數值排序,然後訂閱這個Topic的消費組的消費者再進行字典排序,之後儘量均衡的將分割槽分配給消費者。這裡只能是儘量均衡,因為分割槽數可能無法被消費者數量整除,那麼有一些消費者就會多分配到一些分割槽。

大致演算法如下:

  1. assign(topic, consumers) {
  2. // 對分割槽和Consumer進行排序
  3. List<Partition> partitions = topic.getPartitions();
  4. sort(partitions);
  5. sort(consumers);
  6. // 計算每個Consumer分配的分割槽數
  7. int numPartitionsPerConsumer = partition.size() / consumers.size();
  8. // 額外有一些Consumer會多分配到分割槽
  9. int consumersWithExtraPartition = partition.size() % consumers.size();
  10. // 計算分配結果
  11. for (int i = 0, n = consumers.size(); i < n; i++) {
  12. // 第i個Consumer分配到的分割槽的index
  13. int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
  14. // 第i個Consumer分配到的分割槽數
  15. int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
  16. // 分裝分配結果
  17. assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
  18. }
  19. }

RangeAssignor策略的原理是按照消費者總數和分割槽總數進行整除運算來獲得一個跨度,然後將分割槽按照跨度進行平均分配,以保證分割槽儘可能均勻地分配給所有的消費者。對於每一個Topic,RangeAssignor策略會將消費組內所有訂閱這個Topic的消費者按照名稱的字典序排序,然後為每個消費者劃分固定的分割槽範圍,如果不夠平均分配,那麼字典序靠前的消費者會被多分配一個分割槽。

這種分配方式明顯的一個問題是隨著消費者訂閱的Topic的數量的增加,不均衡的問題會越來越嚴重,比如上圖中4個分割槽3個消費者的場景,C0會多分配一個分割槽。如果此時再訂閱一個分割槽數為4的Topic,那麼C0又會比C1、C2多分配一個分割槽,這樣C0總共就比C1、C2多分配兩個分割槽了,而且隨著Topic的增加,這個情況會越來越嚴重。

 

字典序靠前的消費組中的消費者比較“貪婪”, 往往多分一個。

 

2.4.6.2 RoundRobinAssignor

RoundRobinAssignor的分配策略是將消費組內訂閱的所有Topic的分割槽及所有消費者進行排序後儘量均衡的分配(RangeAssignor是針對單個Topic的分割槽進行排序分配的)。如果消費組內,消費者訂閱的Topic列表是相同的(每個消費者都訂閱了相同的Topic),那麼分配結果是儘量均衡的(消費者之間分配到的分割槽數的差值不會超過1)。如果訂閱的Topic列表是不同的,那麼分配結果是不保證“儘量均衡”的,因為某些消費者不參與一些Topic的分配。

相對於RangeAssignor,在訂閱多個Topic的情況下,RoundRobinAssignor的方式能消費者之間儘量均衡的分配到分割槽(分配到的分割槽數的差值不會超過1——RangeAssignor的分配策略可能隨著訂閱的Topic越來越多,差值越來越大)。

對於消費組內消費者訂閱Topic不一致的情況:假設有兩個個消費者分別為C0和C1,有2個Topic T1、T2,分別擁有3和2個分割槽,並且C0訂閱T1和T2,C1訂閱T2,那麼RoundRobinAssignor的分配結果如下:

看上去分配已經儘量的保證均衡了,不過可以發現C0承擔了4個分割槽的消費而C1訂閱了T2一個分割槽,是不是把T2P0交給C1消費能更加的均衡呢?

 

2.4.6.3 StickyAssignor

動機

儘管RoundRobinAssignor已經在RangeAssignor上做了一些優化來更均衡的分配分割槽,但是在一些情況下依舊會產生嚴重的分配偏差,比如消費組中訂閱的Topic列表不相同的情況下。

更核心的問題是無論是RangeAssignor,還是RoundRobinAssignor,當前的分割槽分配演算法都沒有考慮上一次的分配結果。顯然,在執行一次新的分配之前,如果能考慮到上一次分配的結果,儘量少的調整分割槽分配的變動,顯然是能節省很多開銷的。

目標

從字面意義上看,Sticky是“粘性的”,可以理解為分配結果是帶“粘性的”:

1. 分割槽的分配儘量的均衡
2. 每一次重分配的結果儘量與上一次分配結果保持一致

當這兩個目標發生衝突時,優先保證第一個目標。第一個目標是每個分配演算法都儘量嘗試去完成的,而第二個目標才真正體現出StickyAssignor特性的。

我們先來看預期分配的結構,後續再具體分析StickyAssignor的演算法實現。

例如:

  • 有3個Consumer:C0、C1、C2
  • 有4個Topic:T0、T1、T2、T3,每個Topic有2個分割槽
  • 所有Consumer都訂閱了這4個分割槽

StickyAssignor的分配結果如下圖所示(增加RoundRobinAssignor分配作為對比):

如果消費者1宕機,則按照RoundRobin的方式分配結果如下:
打亂從新來過,輪詢分配:

按照Sticky的方式:
僅對消費者1分配的分割槽進行重分配,紅線部分。最終達到均衡的目的。

 

再舉一個例子:

  • 有3個Consumer:C0、C1、C2
  • 3個Topic:T0、T1、T2,它們分別有1、2、3個分割槽
  • C0訂閱T0;C1訂閱T1;C2訂閱T1、T2

分配結果如下圖所示:

消費者0下線,則按照輪詢的方式分配:

按照Sticky方式分配分割槽,僅僅需要動的就是紅線部分,其他部分不動。

StickyAssignor分配方式的實現稍微複雜點兒,我們可以先理解圖示部分即可。感興趣的可以研究一下。

 

 

2.4.6.4 自定義分配策略(瞭解)

自定義的分配策略必須要實現org.apache.kafka.clients.consumer.internals.PartitionAssignor介面。PartitionAssignor介面的定義如下:

  1. Subscription subscription(Set<String> topics);
  2. String name();
  3. // 主要實現的是下面這個方法
  4. Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions);
  5. void onAssignment(Assignment assignment);
  6. class Subscription {
  7. private final List<String> topics;
  8. private final ByteBuffer userData;
  9. }
  10. class Assignment {
  11. private final List<TopicPartition> partitions;
  12. private final ByteBuffer userData;
  13. }

PartitionAssignor介面中定義了兩個內部類:Subscription和Assignment。

Subscription類用來表示消費者的訂閱資訊,類中有兩個屬性:topics和userData,分別表示消費者所訂閱topic列表和使用者自定義資訊。PartitionAssignor介面通過subscription()方法來設定消費者自身相關的Subscription資訊,注意到此方法中只有一個引數topics,與Subscription類中的topics的相互呼應,但是並沒有有關userData的引數體現。為了增強使用者對分配結果的控制,可以在subscription()方法內部新增一些影響分配的使用者自定義資訊賦予userData,比如:權重、ip地址、host或者機架(rack)等等。

 

再來說一下Assignment類,它是用來表示分配結果資訊的,類中也有兩個屬性:partitions和userData,分別表示所分配到的分割槽集合和使用者自定義的資料。可以通過PartitionAssignor介面中的onAssignment()方法是在每個消費者收到消費組leader分配結果時的回撥函式,例如在StickyAssignor策略中就是通過這個方法儲存當前的分配方案,以備在下次消費組再平衡(rebalance)時可以提供分配參考依據。

 

介面中的name()方法用來提供分配策略的名稱,對於Kafka提供的3種分配策略而言,RangeAssignor對應的protocol_name為“range”,RoundRobinAssignor對應的protocol_name為“roundrobin”,StickyAssignor對應的protocol_name為“sticky”,所以自定義的分配策略中要注意命名的時候不要與已存在的分配策略發生衝突。這個命名用來標識分配策略的名稱,在後面所描述的加入消費組以及選舉消費組leader的時候會有涉及。

 

真正的分割槽分配方案的實現是在assign()方法中,方法中的引數metadata表示叢集的元資料資訊,而subscriptions表示消費組內各個消費者成員的訂閱資訊,最終方法返回各個消費者的分配資訊。

 

Kafka中還提供了一個抽象類
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以簡化PartitionAssignor介面的實現,對assign()方法進行了實現,其中會將Subscription中的userData資訊去掉後,在進行分配。Kafka提供的3種分配策略都是繼承自這個抽象類。如果開發人員在自定義分割槽分配策略時需要使用userData資訊來控制分割槽分配的結果,那麼就不能直接繼承AbstractPartitionAssignor這個抽象類,而需要直接實現PartitionAssignor介面。

  1. package org.apache.kafka.clients.consumer;
  2. import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
  3. import org.apache.kafka.common.TopicPartition;
  4. import java.util.*;
  5. public class MyAssignor extends AbstractPartitionAssignor {
  6. }

在使用時,消費者客戶端需要新增相應的Properties引數,示例如下:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, MyAssignor.class.getName());