1. 程式人生 > 其它 >python-kafka手動提交訊息測試

python-kafka手動提交訊息測試

傳送方程式碼

import json
import time
import traceback
from kafka import KafkaProducer
from kafka.errors import kafka_errors

if __name__ == '__main__':
    producer = KafkaProducer(
        bootstrap_servers=['192.168.32.10:9092'],
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer
=lambda v: json.dumps(v).encode()) # 傳送三條訊息 for i in range(0, 3): time.sleep(2) data = { "filePath": "filePath", "dataName": "2222", "index": i } future = producer.send( 'kafka_test', # 同一個key值,會被送至同一個分割槽 key=11, value
=data) # 向分割槽1傳送訊息 print("send {}".format(data)) try: future.get(timeout=100) # 監控是否傳送成功 except kafka_errors: # 傳送失敗丟擲kafka_errors traceback.format_exc()

接收方程式碼

import json
import time
from threading import Thread
from kafka import KafkaConsumer


def
parse(con, value): try: i = value["index"] print(i, time.ctime(), "start") time.sleep(10 - i * 3) # if i == 0: # pass # else: # con.commit() print(i, time.ctime(), "end") except ZeroDivisionError: print("除 0 ") con.commit() except Exception as e: print(e) finally: print("---") if __name__ == '__main__': consumer = KafkaConsumer( bootstrap_servers=['192.168.32.10:9092'], group_id='test', auto_offset_reset='latest', value_deserializer=lambda m: json.loads(m.decode()), enable_auto_commit=False ) consumer.subscribe(topics=["kafka_test"]) while True: fetch_data_dict = consumer.poll(timeout_ms=100, max_records=1) # print("msg=====", fetch_data_dict) for keys, values in fetch_data_dict.items(): for i in values: th = Thread(target=parse, args=(consumer, i.value)) th.start()

測試場景一:

  設定enable_auto_commit=False後,傳送方傳送3條訊息,接收方不手動commit(), 則下次重啟消費方會重複消費上次未提交的資料

測試場景二:

設想:三條訊息,每條訊息消費的時長不一致,假設第一條10s, 第二條7s,第三條4s, kafka的消費是按照順序的,我們啟動三個執行緒對應消費三條訊息。可想而知第三條消費最快,提交後會阻塞吧?因為此時第二條,第一條還沒提交。

  結論:不會阻塞,查了文件不知道為啥。。大家知道可以留言討論

測試場景三:

  還是啟動三個執行緒對應消費三條訊息,parse方法如下,第一條消費失敗,第二條第三條正常消費提交,此時消費方重啟,還會消費幾次?

def parse(con, value):
    i = value["index"]
    print(i, time.ctime(), "start")
    # time.sleep(10 - i * 3)
    print(10/i)
    # if i == 0:
    #     pass
    # else:
    #     con.commit()
    con.commit()
    print(i, time.ctime(), "end")

  結論: 不會消費了,第一條被提交了

測試場景四:

  啟動三個執行緒對應消費三條訊息,第一條不提交,第二條第三條正常消費提交,此時消費方重啟,還會消費嗎?

  結論: 不會消費了,第一條被提交了

測試場景五:

  不啟動執行緒,同步消費,,第一條消費失敗,第二條第三條正常消費提交,此時消費方重啟,還會消費嗎?

  結論:第一,二,三會再次消費

測試場景六:

  不啟動執行緒,同步消費,第一條不提交,第二條第三條正常消費提交,此時消費方重啟,還會消費嗎?

  結論:第一,二,三會再次消費

可見啟動執行緒非同步消費的話,部分失敗的訊息會被成功的訊息commit掉,同步消費則不會!