Kafka提交offset機制
在kafka的消費者中,有一個非常關鍵的機制,那就是offset機制。它使得Kafka在消費的過程中即使掛了或者引發再均衡問題重新分配Partation,當下次重新恢復消費時仍然可以知道從哪裡開始消費。它好比看一本書中的書籤標記,每次通過書籤標記(offset)就能快速找到該從哪裡開始看(消費)。
Kafka對於offset的處理有兩種提交方式:(1) 自動提交(預設的提交方式) (2) 手動提交(可以靈活地控制offset)
(1) 自動提交偏移量:
Kafka中偏移量的自動提交是由引數enable_auto_commit和auto_commit_interval_ms
如:group_id=test_group_1,則partation=hash("test_group_1")%50=28
自動提交偏移量示例:
1 import pickle
2 import uuid
3 from kafka import KafkaConsumer
4
5 consumer = KafkaConsumer(
6 bootstrap_servers=['192.168.33.11:9092'],
7 group_id="test_group_1",
8 client_id="{}".format(str(uuid.uuid4())),
9 max_poll_records=500,
10 enable_auto_commit=True, # 預設為True 表示自動提交偏移量
11 auto_commit_interval_ms=100, # 控制自動提交偏移量的頻率 單位ms 預設是5000ms
12 key_deserializer=lambda k: pickle.loads(k),
13 value_deserializer=lambda v: pickle.loads(v)
14 )
15
16 # 訂閱消費round_topic這個主題
17 consumer.subscribe(topics=('round_topic',))
18
19 try:
20 while True:
21 consumer_records_dict = consumer.poll(timeout_ms=1000)
22
23 # consumer.assignment()可以獲取每個分割槽的offset
24 for partition in consumer.assignment():
25 print('主題:{} 分割槽:{},需要從下面的offset開始消費:{}'.format(
26 str(partition.topic),
27 str(partition.partition),
28 consumer.position(partition)
29 ))
30
31 # 處理邏輯.
32 for k, record_list in consumer_records_dict.items():
33 print(k)
34 for record in record_list:
35 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
36 record.topic, record.partition, record.offset, record.key, record.value)
37 )
38
39 finally:
40 # 呼叫close方法的時候會觸發偏移量的自動提交 close預設autocommit=True
41 consumer.close()
返回結果:
在上述程式碼中,最後呼叫consumer.close()時候也會觸發自動提交,因為它預設autocommit=True,原始碼如下:
1 def close(self, autocommit=True):
2 """Close the consumer, waiting indefinitely for any needed cleanup.
3
4 Keyword Arguments:
5 autocommit (bool): If auto-commit is configured for this consumer,
6 this optional flag causes the consumer to attempt to commit any
7 pending consumed offsets prior to close. Default: True
8 """
9 if self._closed:
10 return
11 log.debug("Closing the KafkaConsumer.")
12 self._closed = True
13 self._coordinator.close(autocommit=autocommit)
14 self._metrics.close()
15 self._client.close()
16 try:
17 self.config['key_deserializer'].close()
18 except AttributeError:
19 pass
20 try:
21 self.config['value_deserializer'].close()
22 except AttributeError:
23 pass
24 log.debug("The KafkaConsumer has closed.")
對於自動提交偏移量,如果auto_commit_interval_ms的值設定的過大,當消費者在自動提交偏移量之前異常退出,將導致kafka未提交偏移量,進而出現重複消費的問題,所以建議auto_commit_interval_ms的值越小越好。
(2) 手動提交偏移量:
鑑於Kafka自動提交offset的不靈活性和不精確性(只能是按指定頻率的提交),Kafka提供了手動提交offset策略。手動提交能對偏移量更加靈活精準地控制,以保證訊息不被重複消費以及訊息不被丟失。
對於手動提交offset主要有3種方式:1.同步提交 2.非同步提交 3.非同步+同步 組合的方式提交
1.同步手動提交偏移量
同步模式下提交失敗的時候一直嘗試提交,直到遇到無法重試的情況下才會結束,同時同步方式下消費者執行緒在拉取訊息會被阻塞,在broker對提交的請求做出響應之前,會一直阻塞直到偏移量提交操作成功或者在提交過程中發生異常,限制了訊息的吞吐量。
1 """
2 同步的方式10W條訊息 4.58s
3 """
4
5 import pickle
6 import uuid
7 import time
8 from kafka import KafkaConsumer
9
10 consumer = KafkaConsumer(
11 bootstrap_servers=['192.168.33.11:9092'],
12 group_id="test_group_1",
13 client_id="{}".format(str(uuid.uuid4())),
14 enable_auto_commit=False, # 設定為手動提交偏移量.
15 key_deserializer=lambda k: pickle.loads(k),
16 value_deserializer=lambda v: pickle.loads(v)
17 )
18
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',))
21
22 try:
23 start_time = time.time()
24 while True:
25 consumer_records_dict = consumer.poll(timeout_ms=100) # 在輪詢中等待的毫秒數
26 print("獲取下一輪")
27
28 record_num = 0
29 for key, record_list in consumer_records_dict.items():
30 for record in record_list:
31 record_num += 1
32 print("---->當前批次獲取到的訊息個數是:{}<----".format(record_num))
33 record_num = 0
34
35 for k, record_list in consumer_records_dict.items():
36 for record in record_list:
37 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
38 record.topic, record.partition, record.offset, record.key, record.value)
39 )
40
41 try:
42 # 輪詢一個batch 手動提交一次
43 consumer.commit() # 提交當前批次最新的偏移量. 會阻塞 執行完後才會下一輪poll
44 end_time = time.time()
45 time_counts = end_time - start_time
46 print(time_counts)
47 except Exception as e:
48 print('commit failed', str(e))
49
50 finally:
51 consumer.close() # 手動提交中close對偏移量提交沒有影響
從上述可以看出,每輪循一個批次,手動提交一次,只有當前批次的訊息提交完成時才會觸發poll來獲取下一輪的訊息,經測試10W條訊息耗時4.58s
2.非同步手動提交偏移量+回撥函式
非同步手動提交offset時,消費者執行緒不會阻塞,提交失敗的時候也不會進行重試,並且可以配合回撥函式在broker做出響應的時候記錄錯誤資訊。
1 """
2 非同步的方式手動提交偏移量(非同步+回撥函式的模式) 10W條訊息 3.09s
3 """
4
5 import pickle
6 import uuid
7 import time
8 from kafka import KafkaConsumer
9
10 consumer = KafkaConsumer(
11 bootstrap_servers=['192.168.33.11:9092'],
12 group_id="test_group_1",
13 client_id="{}".format(str(uuid.uuid4())),
14 enable_auto_commit=False, # 設定為手動提交偏移量.
15 key_deserializer=lambda k: pickle.loads(k),
16 value_deserializer=lambda v: pickle.loads(v)
17 )
18
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',))
21
22
23 def _on_send_response(*args, **kwargs):
24 """
25 提交偏移量涉及回撥函式
26 :param args: args[0] --> {TopicPartition:OffsetAndMetadata} args[1] --> Exception
27 :param kwargs:
28 :return:
29 """
30 if isinstance(args[1], Exception):
31 print('偏移量提交異常. {}'.format(args[1]))
32 else:
33 print('偏移量提交成功')
34
35
36 try:
37 start_time = time.time()
38 while True:
39 consumer_records_dict = consumer.poll(timeout_ms=10)
40
41 record_num = 0
42 for key, record_list in consumer_records_dict.items():
43 for record in record_list:
44 record_num += 1
45 print("當前批次獲取到的訊息個數是:{}".format(record_num))
46
47 for record_list in consumer_records_dict.values():
48 for record in record_list:
49 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
50 record.topic, record.partition, record.offset, record.key, record.value))
51
52 # 避免頻繁提交
53 if record_num != 0:
54 try:
55 consumer.commit_async(callback=_on_send_response)
56 except Exception as e:
57 print('commit failed', str(e))
58
59 record_num = 0
60
61 finally:
62 consumer.close()
對於args引數:args[0]是一個dict,key是TopicPartition,value是OffsetAndMetadata,表示該主題下的partition對應的offset;args[1]在提交成功是True,提交失敗時是一個Exception類。
對於非同步提交,由於不會進行失敗重試,當消費者異常關閉或者觸發了再均衡前,如果偏移量還未提交就會造成偏移量丟失。
3.非同步+同步 組合的方式提交偏移量
針對非同步提交偏移量丟失的問題,通過對消費者進行非同步批次提交併且在關閉時同步提交的方式,這樣即使上一次的非同步提交失敗,通過同步提交還能夠進行補救,同步會一直重試,直到提交成功。
1 """
2 同步和非同步組合的方式提交偏移量
3 """
4
5 import pickle
6 import uuid
7 import time
8 from kafka import KafkaConsumer
9
10 consumer = KafkaConsumer(
11 bootstrap_servers=['192.168.33.11:9092'],
12 group_id="test_group_1",
13 client_id="{}".format(str(uuid.uuid4())),
14 enable_auto_commit=False, # 設定為手動提交偏移量.
15 key_deserializer=lambda k: pickle.loads(k),
16 value_deserializer=lambda v: pickle.loads(v)
17 )
18
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',))
21
22
23 def _on_send_response(*args, **kwargs):
24 """
25 提交偏移量涉及的回撥函式
26 :param args:
27 :param kwargs:
28 :return:
29 """
30 if isinstance(args[1], Exception):
31 print('偏移量提交異常. {}'.format(args[1]))
32 else:
33 print('偏移量提交成功')
34
35
36 try:
37 start_time = time.time()
38 while True:
39 consumer_records_dict = consumer.poll(timeout_ms=100)
40
41 record_num = 0
42 for key, record_list in consumer_records_dict.items():
43 for record in record_list:
44 record_num += 1
45 print("---->當前批次獲取到的訊息個數是:<----".format(record_num))
46 record_num = 0
47
48 for k, record_list in consumer_records_dict.items():
49 print(k)
50 for record in record_list:
51 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
52 record.topic, record.partition, record.offset, record.key, record.value)
53 )
54
55 try:
56 # 輪詢一個batch 手動提交一次
57 consumer.commit_async(callback=_on_send_response)
58 end_time = time.time()
59 time_counts = end_time - start_time
60 print(time_counts)
61 except Exception as e:
62 print('commit failed', str(e))
63
64 except Exception as e:
65 print(str(e))
66 finally:
67 try:
68 # 同步提交偏移量,在消費者異常退出的時候再次提交偏移量,確保偏移量的提交.
69 consumer.commit()
70 print("同步補救提交成功")
71 except Exception as e:
72 consumer.close()
通過finally在最後不管是否異常都會觸發consumer.commit()來同步補救一次,確保偏移量不會丟失