1. 程式人生 > 實用技巧 >Python呼叫Kafka傳送資料

Python呼叫Kafka傳送資料

來源於 https://www.cnblogs.com/FG123/p/10091478.html

Kafka是一種分散式的基於釋出/訂閱的訊息系統,它的高吞吐量、靈活的offset是其它訊息系統所沒有的。

Kafka傳送訊息主要有三種方式:

1.傳送並忘記 2.同步傳送 3.非同步傳送+回撥函式

下面以單節點的方式分別用三種方法傳送1w條訊息測試:

方式一:傳送並忘記(不關心訊息是否正常到達,對返回結果不做任何判斷處理)

傳送並忘記的方式本質上也是一種非同步的方式,只是它不會獲取訊息傳送的返回結果,這種方式的吞吐量是最高的,但是無法保證訊息的可靠性:

 1 import pickle
 2 import time
 3 from kafka import KafkaProducer
 4 
 5 producer = KafkaProducer(bootstrap_servers=['192.168.33.11:9092'],
 6                          key_serializer=lambda k: pickle.dumps(k),
 7                          value_serializer=lambda v: pickle.dumps(v))
 8 
 9 start_time = time.time()
10 for i in range(0, 10000):
11     print('------{}---------'.format(i))
12     future = producer.send('test_topic', key='num', value=i, partition=0)
13 
14 # 將緩衝區的全部訊息push到broker當中
15 producer.flush()
16 producer.close()
17 
18 end_time = time.time()
19 time_counts = end_time - start_time
20 print(time_counts)

測試結果:1.88s

方式二:同步傳送(通過get方法等待Kafka的響應,判斷訊息是否傳送成功)

以同步的方式傳送訊息時,一條一條的傳送,對每條訊息返回的結果判斷, 可以明確地知道每條訊息的傳送情況,但是由於同步的方式會阻塞,只有當訊息通過get返回future物件時,才會繼續下一條訊息的傳送:

 1 import pickle
 2 import time
 3 from kafka import KafkaProducer
 4 from kafka.errors import kafka_errors
 5 
 6 producer = KafkaProducer(
 7     bootstrap_servers=['192.168.33.11:9092'],
 8     key_serializer=lambda k: pickle.dumps(k),
 9     value_serializer=lambda v: pickle.dumps(v)
10 )
11 
12 start_time = time.time()
13 for i in range(0, 10000):
14     print('------{}---------'.format(i))
15     future = producer.send(topic="test_topic", key="num", value=i)
16     # 同步阻塞,通過呼叫get()方法進而保證一定程式是有序的.
17     try:
18         record_metadata = future.get(timeout=10)
19         # print(record_metadata.topic)
20         # print(record_metadata.partition)
21         # print(record_metadata.offset)
22     except kafka_errors as e:
23         print(str(e))
24 
25 end_time = time.time()
26 time_counts = end_time - start_time
27 print(time_counts)

測試結果:16s

方式三:非同步傳送+回撥函式(訊息以非同步的方式傳送,通過回撥函式返回訊息傳送成功/失敗)

在呼叫send方法傳送訊息的同時,指定一個回撥函式,伺服器在返回響應時會呼叫該回調函式,通過回撥函式能夠對異常情況進行處理,當呼叫了回撥函式時,只有回撥函式執行完畢生產者才會結束,否則一直會阻塞:

 1 import pickle
 2 import time
 3 from kafka import KafkaProducer
 4 
 5 producer = KafkaProducer(
 6     bootstrap_servers=['192.168.33.11:9092'],
 7     key_serializer=lambda k: pickle.dumps(k),
 8     value_serializer=lambda v: pickle.dumps(v)
 9 )
10 
11 
12 def on_send_success(*args, **kwargs):
13     """
14     傳送成功的回撥函式
15     :param args:
16     :param kwargs:
17     :return:
18     """
19     return args
20 
21 
22 def on_send_error(*args, **kwargs):
23     """
24     傳送失敗的回撥函式
25     :param args:
26     :param kwargs:
27     :return:
28     """
29 
30     return args
31 
32 
33 start_time = time.time()
34 for i in range(0, 10000):
35     print('------{}---------'.format(i))
36     # 如果成功,傳進record_metadata,如果失敗,傳進Exception.
37     producer.send(
38         topic="test_topic", key="num", value=i
39     ).add_callback(on_send_success).add_errback(on_send_error)
40 
41 producer.flush()
42 producer.close()
43 
44 end_time = time.time()
45 time_counts = end_time - start_time
46 print(time_counts)

測試結果:2.15s

三種方式雖然在時間上有所差別,但並不是說時間越快的越好,具體要看業務的應用場景:

場景1:如果業務要求訊息必須是按順序傳送的,那麼可以使用同步的方式,並且只能在一個partation上,結合引數設定retries的值讓傳送失敗時重試,設定max_in_flight_requests_per_connection=1,可以控制生產者在收到伺服器晌應之前只能傳送1個訊息,從而控制訊息順序傳送;

場景2:如果業務只關心訊息的吞吐量,容許少量訊息傳送失敗,也不關注訊息的傳送順序,那麼可以使用傳送並忘記的方式,並配合引數acks=0,這樣生產者不需要等待伺服器的響應,以網路能支援的最大速度傳送訊息;

場景3:如果業務需要知道訊息傳送是否成功,並且對訊息的順序不關心,那麼可以用非同步+回撥的方式來發送訊息,配合引數retries=0,並將傳送失敗的訊息記錄到日誌檔案中;