1. 程式人生 > >Kafka提交offset機制

Kafka提交offset機制

在kafka的消費者中,有一個非常關鍵的機制,那就是offset機制。它使得Kafka在消費的過程中即使掛了或者引發再均衡問題重新分配Partation,當下次重新恢復消費時仍然可以知道從哪裡開始消費。它好比看一本書中的書籤標記,每次通過書籤標記(offset)就能快速找到該從哪裡開始看(消費)。

Kafka對於offset的處理有兩種提交方式:(1) 自動提交(預設的提交方式)   (2) 手動提交(可以靈活地控制offset)

(1) 自動提交偏移量:

Kafka中偏移量的自動提交是由引數enable_auto_commit和auto_commit_interval_ms

控制的,當enable_auto_commit=True時,Kafka在消費的過程中會以頻率為auto_commit_interval_ms向Kafka自帶的topic(__consumer_offsets)進行偏移量提交,具體提交到哪個Partation是以演算法:partation=hash(group_id)%50來計算的。

如:group_id=test_group_1,則partation=hash("test_group_1")%50=28

自動提交偏移量示例:

 1 import pickle
 2 import uuid
 3 from kafka import KafkaConsumer
4 5 consumer = KafkaConsumer( 6 bootstrap_servers=['192.168.33.11:9092'], 7 group_id="test_group_1", 8 client_id="{}".format(str(uuid.uuid4())), 9 max_poll_records=500, 10 enable_auto_commit=True, # 預設為True 表示自動提交偏移量 11 auto_commit_interval_ms=100, # 控制自動提交偏移量的頻率 單位ms 預設是5000ms
12 key_deserializer=lambda k: pickle.loads(k), 13 value_deserializer=lambda v: pickle.loads(v) 14 ) 15 16 # 訂閱消費round_topic這個主題 17 consumer.subscribe(topics=('round_topic',)) 18 19 try: 20 while True: 21 consumer_records_dict = consumer.poll(timeout_ms=1000) 22 23 # consumer.assignment()可以獲取每個分割槽的offset 24 for partition in consumer.assignment(): 25 print('主題:{} 分割槽:{},需要從下面的offset開始消費:{}'.format( 26 str(partition.topic), 27 str(partition.partition), 28 consumer.position(partition) 29 )) 30 31 # 處理邏輯. 32 for k, record_list in consumer_records_dict.items(): 33 print(k) 34 for record in record_list: 35 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 36 record.topic, record.partition, record.offset, record.key, record.value) 37 ) 38 39 finally: 40 # 呼叫close方法的時候會觸發偏移量的自動提交 close預設autocommit=True 41 consumer.close()

 返回結果:

在上述程式碼中,最後呼叫consumer.close()時候也會觸發自動提交,因為它預設autocommit=True,原始碼如下:

 1     def close(self, autocommit=True):
 2         """Close the consumer, waiting indefinitely for any needed cleanup.
 3 
 4         Keyword Arguments:
 5             autocommit (bool): If auto-commit is configured for this consumer,
 6                 this optional flag causes the consumer to attempt to commit any
 7                 pending consumed offsets prior to close. Default: True
 8         """
 9         if self._closed:
10             return
11         log.debug("Closing the KafkaConsumer.")
12         self._closed = True
13         self._coordinator.close(autocommit=autocommit)
14         self._metrics.close()
15         self._client.close()
16         try:
17             self.config['key_deserializer'].close()
18         except AttributeError:
19             pass
20         try:
21             self.config['value_deserializer'].close()
22         except AttributeError:
23             pass
24         log.debug("The KafkaConsumer has closed.")

 

對於自動提交偏移量,如果auto_commit_interval_ms的值設定的過大,當消費者在自動提交偏移量之前異常退出,將導致kafka未提交偏移量,進而出現重複消費的問題,所以建議auto_commit_interval_ms的值越小越好。

 

(2) 手動提交偏移量:

鑑於Kafka自動提交offset的不靈活性和不精確性(只能是按指定頻率的提交),Kafka提供了手動提交offset策略。手動提交能對偏移量更加靈活精準地控制,以保證訊息不被重複消費以及訊息不被丟失。

對於手動提交offset主要有3種方式:1.同步提交  2.非同步提交  3.非同步+同步 組合的方式提交

 1.同步手動提交偏移量

同步模式下提交失敗的時候一直嘗試提交,直到遇到無法重試的情況下才會結束,同時同步方式下消費者執行緒在拉取訊息會被阻塞,在broker對提交的請求做出響應之前,會一直阻塞直到偏移量提交操作成功或者在提交過程中發生異常,限制了訊息的吞吐量。

 1 """
 2 同步的方式10W條訊息  4.58s
 3 """
 4 
 5 import pickle
 6 import uuid
 7 import time
 8 from kafka import KafkaConsumer
 9 
10 consumer = KafkaConsumer(
11     bootstrap_servers=['192.168.33.11:9092'],
12     group_id="test_group_1",
13     client_id="{}".format(str(uuid.uuid4())),
14     enable_auto_commit=False,  # 設定為手動提交偏移量.
15     key_deserializer=lambda k: pickle.loads(k),
16     value_deserializer=lambda v: pickle.loads(v)
17 )
18 
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',))
21 
22 try:
23     start_time = time.time()
24     while True:
25         consumer_records_dict = consumer.poll(timeout_ms=100)  # 在輪詢中等待的毫秒數
26         print("獲取下一輪")
27 
28         record_num = 0
29         for key, record_list in consumer_records_dict.items():
30             for record in record_list:
31                 record_num += 1
32         print("---->當前批次獲取到的訊息個數是:{}<----".format(record_num))
33         record_num = 0
34 
35         for k, record_list in consumer_records_dict.items():
36             for record in record_list:
37                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
38                     record.topic, record.partition, record.offset, record.key, record.value)
39                 )
40 
41         try:
42             # 輪詢一個batch 手動提交一次
43             consumer.commit()  # 提交當前批次最新的偏移量. 會阻塞  執行完後才會下一輪poll
44             end_time = time.time()
45             time_counts = end_time - start_time
46             print(time_counts)
47         except Exception as e:
48             print('commit failed', str(e))
49 
50 finally:
51     consumer.close()  # 手動提交中close對偏移量提交沒有影響

 

從上述可以看出,每輪循一個批次,手動提交一次,只有當前批次的訊息提交完成時才會觸發poll來獲取下一輪的訊息,經測試10W條訊息耗時4.58s

 2.非同步手動提交偏移量+回撥函式

 非同步手動提交offset時,消費者執行緒不會阻塞,提交失敗的時候也不會進行重試,並且可以配合回撥函式在broker做出響應的時候記錄錯誤資訊。

 

 1 """
 2 非同步的方式手動提交偏移量(非同步+回撥函式的模式) 10W條訊息 3.09s
 3 """
 4 
 5 import pickle
 6 import uuid
 7 import time
 8 from kafka import KafkaConsumer
 9 
10 consumer = KafkaConsumer(
11     bootstrap_servers=['192.168.33.11:9092'],
12     group_id="test_group_1",
13     client_id="{}".format(str(uuid.uuid4())),
14     enable_auto_commit=False,  # 設定為手動提交偏移量.
15     key_deserializer=lambda k: pickle.loads(k),
16     value_deserializer=lambda v: pickle.loads(v)
17 )
18 
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',))
21 
22 
23 def _on_send_response(*args, **kwargs):
24     """
25     提交偏移量涉及回撥函式
26     :param args: args[0] --> {TopicPartition:OffsetAndMetadata}  args[1] --> Exception
27     :param kwargs:
28     :return:
29     """
30     if isinstance(args[1], Exception):
31         print('偏移量提交異常. {}'.format(args[1]))
32     else:
33         print('偏移量提交成功')
34 
35 
36 try:
37     start_time = time.time()
38     while True:
39         consumer_records_dict = consumer.poll(timeout_ms=10)
40 
41         record_num = 0
42         for key, record_list in consumer_records_dict.items():
43             for record in record_list:
44                 record_num += 1
45         print("當前批次獲取到的訊息個數是:{}".format(record_num))
46 
47         for record_list in consumer_records_dict.values():
48             for record in record_list:
49                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
50                     record.topic, record.partition, record.offset, record.key, record.value))
51 
52         # 避免頻繁提交
53         if record_num != 0:
54             try:
55                 consumer.commit_async(callback=_on_send_response)
56             except Exception as e:
57                 print('commit failed', str(e))
58 
59         record_num = 0
60 
61 finally:
62     consumer.close()

對於args引數:args[0]是一個dict,key是TopicPartition,value是OffsetAndMetadata,表示該主題下的partition對應的offset;args[1]在提交成功是True,提交失敗時是一個Exception類。

對於非同步提交,由於不會進行失敗重試,當消費者異常關閉或者觸發了再均衡前,如果偏移量還未提交就會造成偏移量丟失。

 3.非同步+同步 組合的方式提交偏移量

針對非同步提交偏移量丟失的問題,通過對消費者進行非同步批次提交併且在關閉時同步提交的方式,這樣即使上一次的非同步提交失敗,通過同步提交還能夠進行補救,同步會一直重試,直到提交成功。

 1 """
 2 同步和非同步組合的方式提交偏移量
 3 """
 4 
 5 import pickle
 6 import uuid
 7 import time
 8 from kafka import KafkaConsumer
 9 
10 consumer = KafkaConsumer(
11     bootstrap_servers=['192.168.33.11:9092'],
12     group_id="test_group_1",
13     client_id="{}".format(str(uuid.uuid4())),
14     enable_auto_commit=False,  # 設定為手動提交偏移量.
15     key_deserializer=lambda k: pickle.loads(k),
16     value_deserializer=lambda v: pickle.loads(v)
17 )
18 
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',))
21 
22 
23 def _on_send_response(*args, **kwargs):
24     """
25     提交偏移量涉及的回撥函式
26     :param args:
27     :param kwargs:
28     :return:
29     """
30     if isinstance(args[1], Exception):
31         print('偏移量提交異常. {}'.format(args[1]))
32     else:
33         print('偏移量提交成功')
34 
35 
36 try:
37     start_time = time.time()
38     while True:
39         consumer_records_dict = consumer.poll(timeout_ms=100)
40 
41         record_num = 0
42         for key, record_list in consumer_records_dict.items():
43             for record in record_list:
44                 record_num += 1
45         print("---->當前批次獲取到的訊息個數是:<----".format(record_num))
46         record_num = 0
47 
48         for k, record_list in consumer_records_dict.items():
49             print(k)
50             for record in record_list:
51                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
52                     record.topic, record.partition, record.offset, record.key, record.value)
53                 )
54 
55         try:
56             # 輪詢一個batch 手動提交一次
57             consumer.commit_async(callback=_on_send_response)
58             end_time = time.time()
59             time_counts = end_time - start_time
60             print(time_counts)
61         except Exception as e:
62             print('commit failed', str(e))
63 
64 except Exception as e:
65     print(str(e))
66 finally:
67     try:
68         # 同步提交偏移量,在消費者異常退出的時候再次提交偏移量,確保偏移量的提交.
69         consumer.commit()
70         print("同步補救提交成功")
71     except Exception as e:
72         consumer.close()

通過finally在最後不管是否異常都會觸發consumer.commit()來同步補救一次,確保偏移量不會丟失