1. 程式人生 > >Kafka消費者組再均衡問題

Kafka消費者組再均衡問題

在Kafka中,當有新消費者加入或者訂閱的topic數發生變化時,會觸發Rebalance(再均衡:在同一個消費者組當中,分割槽的所有權從一個消費者轉移到另外一個消費者)機制,Rebalance顧名思義就是重新均衡消費者消費。Rebalance的過程如下:

第一步:所有成員都向coordinator傳送請求,請求入組。一旦所有成員都發送了請求,coordinator會從中選擇一個consumer擔任leader的角色,並把組成員資訊以及訂閱資訊發給leader。

第二步:leader開始分配消費方案,指明具體哪個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案發給coordinator。coordinator接收到分配方案之後會把方案發給各個consumer,這樣組內的所有成員就都知道自己應該消費哪些分割槽了。

所以對於Rebalance來說,Coordinator起著至關重要的作用,那麼怎麼檢視消費者對應的Coordinator呢,我們知道某個消費者組對應__consumer_offsets中的哪個Partation是通過hash計算出來的:partation=hash("test_group_1")%50=28,表示test_group_1這個消費者組屬於28號partation,通過命令:

 

./kafka-topics.sh --zookeeper 192.168.33.11:2181 --describe --topic __consumer_offsets

可以找到28號Partation所對應的資訊:

從而可以知道coordinator對應的broker為1

 

在Rebalance期間,消費者會出現無法讀取訊息,造成整個消費者群組一段時間內不可用,假設現在消費者組當中有A,程式碼邏輯執行10s,如果消費者組在消費的過程中consumer B加入到了該消費者組,並且B的程式碼邏輯執行20s,那麼當A處理完後先進入Rebalance狀態等待,只有當B也處理完後,A和B才真正通過Rebalance重新分配,這樣顯然A在等待的過程中浪費了資源。

消費者A:

 1 """
 2 consumer_rebalance_a.py a消費者
 3 """
 4 import pickle
5 import uuid 6 import time 7 from kafka import KafkaConsumer 8 from kafka.structs import TopicPartition, OffsetAndMetadata 9 from kafka import ConsumerRebalanceListener 10 11 consumer = KafkaConsumer( 12 bootstrap_servers=['192.168.33.11:9092'], 13 group_id="test_group_1", 14 client_id="{}".format(str(uuid.uuid4())), 15 enable_auto_commit=False, 16 key_deserializer=lambda k: pickle.loads(k), 17 value_deserializer=lambda v: pickle.loads(v) 18 ) 19 20 # 用來記錄最新的偏移量資訊. 21 consumer_offsets = {} 22 23 24 class MineConsumerRebalanceListener(ConsumerRebalanceListener): 25 def on_partitions_revoked(self, revoked): 26 """ 27 再均衡開始之前 下一輪poll之前觸發 28 :param revoked: 29 :return: 30 """ 31 print('再均衡開始之前被自動觸發.') 32 print(revoked, type(revoked)) 33 consumer.commit_async(offsets=consumer_offsets) 34 35 def on_partitions_assigned(self, assigned): 36 """ 37 再均衡完成之後 即將下一輪poll之前 觸發 38 :param assigned: 39 :return: 40 """ 41 print('在均衡完成之後自動觸發.') 42 print(assigned, type(assigned)) 43 44 45 consumer.subscribe(topics=('round_topic',), listener=MineConsumerRebalanceListener()) 46 47 48 def _on_send_response(*args, **kwargs): 49 """ 50 提交偏移量涉及回撥函式 51 :param args: 52 :param kwargs: 53 :return: 54 """ 55 if isinstance(args[1], Exception): 56 print('偏移量提交異常. {}'.format(args[1])) 57 else: 58 print('偏移量提交成功') 59 60 61 try: 62 start_time = time.time() 63 while True: 64 # 再均衡其實是在poll之前完成的 65 consumer_records_dict = consumer.poll(timeout_ms=100) 66 67 # 處理邏輯. 68 for k, record_list in consumer_records_dict.items(): 69 for record in record_list: 70 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 71 record.topic, record.partition, record.offset, record.key, record.value) 72 ) 73 74 consumer_offsets[ 75 TopicPartition(record.topic, record.partition) 76 ] = OffsetAndMetadata( 77 record.offset + 1, metadata='偏移量.' 78 ) 79 80 try: 81 consumer.commit_async(callback=_on_send_response) 82 time.sleep(10) 83 except Exception as e: 84 print('commit failed', str(e)) 85 86 except Exception as e: 87 print(str(e)) 88 finally: 89 try: 90 # 同步提交偏移量,在消費者異常退出的時候再次提交偏移量,確保偏移量的提交. 91 consumer.commit() 92 print("同步補救提交成功") 93 except Exception as e: 94 consumer.close()

 消費者B:

  1 """
  2 consumer b.py 消費者B
  3 """
  4 
  5 import pickle
  6 import uuid
  7 import time
  8 from kafka import KafkaConsumer
  9 from kafka.structs import TopicPartition, OffsetAndMetadata
 10 from kafka import ConsumerRebalanceListener
 11 
 12 consumer = KafkaConsumer(
 13     bootstrap_servers=['192.168.33.11:9092'],
 14     group_id="test_group_1",
 15     client_id="{}".format(str(uuid.uuid4())),
 16     enable_auto_commit=False,  # 設定為手動提交偏移量.
 17     key_deserializer=lambda k: pickle.loads(k),
 18     value_deserializer=lambda v: pickle.loads(v)
 19 )
 20 
 21 consumer_offsets = {}  # 用來記錄最新的偏移量資訊.
 22 
 23 
 24 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
 25     def on_partitions_revoked(self, revoked):
 26         """
 27         再均衡開始之前 下一輪poll之前觸發
 28         :param revoked:
 29         :return:
 30         """
 31         print('再均衡開始之前被自動觸發.')
 32         print(revoked, type(revoked))
 33         consumer.commit_async(offsets=consumer_offsets)
 34 
 35     def on_partitions_assigned(self, assigned):
 36         """
 37         再均衡完成之後  即將下一輪poll之前 觸發
 38         :param assigned:
 39         :return:
 40         """
 41 
 42         print('在均衡完成之後自動觸發.')
 43         print(assigned, type(assigned))
 44 
 45 
 46 consumer.subscribe(topics=('round_topic',), listener=MineConsumerRebalanceListener())
 47 
 48 
 49 def _on_send_response(*args, **kwargs):
 50     """
 51     提交偏移量涉及回撥函式
 52     :param args: 
 53     :param kwargs:
 54     :return:
 55     """
 56 
 57     if isinstance(args[1], Exception):
 58         print('偏移量提交異常. {}'.format(args[1]))
 59     else:
 60         print('偏移量提交成功')
 61 
 62 
 63 try:
 64     start_time = time.time()
 65     while True:
 66         # 再均衡其實是在poll之前完成的
 67         consumer_records_dict = consumer.poll(timeout_ms=100)
 68 
 69         record_num = 0
 70         for key, record_list in consumer_records_dict.items():
 71             for record in record_list:
 72                 record_num += 1
 73         print("---->當前批次獲取到的訊息個數是:{}".format(record_num))
 74 
 75         # 處理邏輯.
 76         for k, record_list in consumer_records_dict.items():
 77             for record in record_list:
 78                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
 79                     record.topic, record.partition, record.offset, record.key, record.value)
 80                 )
 81 
 82                 consumer_offsets[
 83                     TopicPartition(record.topic, record.partition)
 84                 ] = OffsetAndMetadata(record.offset + 1, metadata='偏移量.')
 85 
 86         try:
 87             # 輪詢一個batch 手動提交一次
 88             consumer.commit_async(callback=_on_send_response)
 89             time.sleep(20)
 90         except Exception as e:
 91             print('commit failed', str(e))
 92 
 93 except Exception as e:
 94     print(str(e))
 95 finally:
 96     try:
 97         # 同步提交偏移量,在消費者異常退出的時候再次提交偏移量,確保偏移量的提交.
 98         consumer.commit()
 99         print("同步補救提交成功")
100     except Exception as e:
101         consumer.close()

消費者A和消費者B是同一個消費者組(test_group_1)的兩個消費者,用time.sleep的方式模擬執行時間,A:10s,B:20s;首先A開始消費,當B新加入消費者組的時候會觸發Rebalance,可以通過實現再均衡監聽器(RebalanceListener)中的on_partitions_revoked和on_partitions_assigned方法來檢視再均衡觸發前後的partition變化情況,依次啟動消費者A和B之後:

消費者A:
再均衡開始之前被自動觸發.
{TopicPartition(topic='round_topic', partition=0), TopicPartition(topic='round_topic', partition=1), TopicPartition(topic='round_topic', partition=2)} <class 'set'>
<----------------------------------------
---------------------------------------->
在均衡完成之後自動觸發.
{TopicPartition(topic='round_topic', partition=0), TopicPartition(topic='round_topic', partition=1)} <class 'set'>
<----------------------------------------


消費者B:
再均衡開始之前被自動觸發.
set() <class 'set'>
<----------------------------------------
---------------------------------------->
在均衡完成之後自動觸發.
{TopicPartition(topic='round_topic', partition=2)} <class 'set'>
<----------------------------------------

在等待B的邏輯執行完後,A和B進入再均衡狀態;再均衡前A處於partition 0、1、 2三個分割槽,B不佔有任何partition;當再均衡結束後,A佔有partition 0、1,B佔有partition 2;然後A和B分別開始消費對應的partition。

在上述消費者A和B的程式碼中重寫了RebalanceListener,主要是為了在發生再均衡之前提交最後一個已經處理記錄的偏移量,因為再均衡時消費者將失去對一個分割槽的所有權,如果消費者已經消費了當前partition還沒提交offset,這時候發生再均衡會使得消費者重新分配partition,可能使得同一個訊息先後被兩個消費者消費的情況,實現MineConsumerRebalanceListener再均衡前提交一次offset,確保每一個消費者在觸發再均衡前提交最後一次offset:

 1 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
 2     def on_partitions_revoked(self, revoked):
 3         """
 4         再均衡開始之前 下一輪poll之前觸發
 5         :param revoked:
 6         :return:
 7         """
 8         print('再均衡開始之前被自動觸發.')
 9         print(revoked, type(revoked))
10         consumer.commit_async(offsets=consumer_offsets)
11 
12     def on_partitions_assigned(self, assigned):
13         """
14         再均衡完成之後  即將下一輪poll之前 觸發
15         :param assigned:
16         :return:
17         """
18 
19         print('在均衡完成之後自動觸發.')
20         print(assigned, type(assigned))

 

再均衡發生的場景有以下幾種:

1. 組成員發生變更(新consumer加入組、已有consumer主動離開組或已有consumer崩潰了)
2. 訂閱主題數發生變更,如果你使用了正則表示式的方式進行訂閱,那麼新建匹配正則表示式的topic就會觸發rebalance
3. 訂閱主題的分割槽數發生變更
鑑於觸發再均衡後會造成資源浪費的問題,所以我們儘量不要觸發再均衡