使用Python操作Kafka:KafkaProducer、KafkaConsumer
阿新 • • 發佈:2021-06-10
其實很早就是用kafka了,但是一直都用的非常簡單,最近寫一個小功能的時候又要用到kafka,於是就花點時間好好看了一下網上關於kafka的一些文件和部落格,發現了一個很不錯的部落格,做個記錄和分享。
原文連結: https://www.cnblogs.com/rexcheny/articles/9463979.html
作者好像是阿里員工,他在這一篇部落格中對於一個常用的引數都做了詳細的解釋,並寫了一個類可以直接使用,非常感謝。
單執行緒生產者
說是單執行緒,其實並不是,你啟動一個生產者其實是2個執行緒,後臺有一個IO執行緒用於真正傳送訊息出去,前臺有一個執行緒用於把訊息傳送到本地緩衝區。
#!/usr/bin/env python # Author: rex.cheny # E-mail: [email protected] importtime importrandom importsys fromkafkaimportKafkaProducer fromkafka.errorsimportKafkaError, KafkaTimeoutError importjson """ KafkaProducer是釋出訊息到Kafka叢集的客戶端,它是執行緒安全的並且共享單一生產者例項。生產者包含一個帶有緩衝區的池, 用於儲存還沒有傳送到Kafka叢集的訊息記錄以及一個後臺IO執行緒,該執行緒將這些留在緩衝區的訊息記錄傳送到Kafka叢集中。 """ """ KafkaProducer建構函式引數解釋 - acks 0表示傳送不理睬傳送是否成功;1表示需要等待leader成功寫入日誌才返回;all表示所有副本都寫入日誌才返回 - buffer_memory 預設33554432也就是32M,該引數用於設定producer用於快取訊息的緩衝區大小,如果採用非同步傳送訊息,那麼 生產者啟動後會建立一個記憶體緩衝區用於存放待發送的訊息,然後由專屬執行緒來把放在緩衝區的訊息進行真正傳送, 如果要給生產者要給很多分割槽發訊息那麼就需要考慮這個引數的大小防止過小降低吞吐量 - compression_type 是否啟用壓縮,預設是none,可選型別為gzip、lz4、snappy三種。壓縮會降低網路IO但是會增加生產者端的CPU 消耗。另外如果broker端的壓縮設定和生產者不同那麼也會給broker帶來重新解壓縮和重新壓縮的CPU負擔。 - retries 重試次數,當訊息傳送失敗後會嘗試幾次重發。預設為0,一般考慮到網路抖動或者分割槽的leader切換,而不是服務端 真的故障所以可以設定重試3次。 - retry_backoff_ms 每次重試間隔多少毫秒,預設100毫秒。 - max_in_flight_requests_per_connection 生產者會將多個傳送請求快取在記憶體中,預設是5個,如果你開啟了重試,也就是設定了 retries引數,那麼將可能導致針對於同一分割槽的訊息出現順序錯亂。為了防止這種情況 需要把該引數設定為1,來保障同分區的訊息順序。 - batch_size 對於調優生產者吞吐量和延遲效能指標有重要的作用。buffer_memeory可以看做池子,而這個batch_size可以看做池子裡 裝有訊息的小盒子。這個值預設16384也就是16K,其實不大。生產者會把發往同一個分割槽的訊息放在一個batch中,當batch 滿了就會發送裡面的訊息,但是也不一定非要等到滿了才會發。這個數值大那麼生產者吞吐量高但是效能低因為盒子太大佔用記憶體 傳送的時候這個資料量也就大。如果你設定成1M,那麼顯然生產者的吞吐量要比16K高的多。 - linger_ms 上面說batch沒有填滿也可以傳送,那顯然有一個時間控制,就是這個引數,預設是0毫秒,這個引數就是用於控制訊息傳送延遲 多久的。預設是立即傳送,無需關係batch是否填滿。大多數場景我們希望立即傳送,但是這也降低了吞吐量。 - max_request_size 最大請求大小,可以理解為一條訊息記錄的最大大小,預設是1048576位元組。 - request_timeout_ms 生產者傳送訊息後,broker需要在規定時間內將處理結果返回給生產者,那個這個時間長度就是這個引數 控制的,預設30000,也就是30秒。如果broker在30秒內沒有給生產者響應,那麼生產者就會認為請求超時,並在回撥函式 中進行特殊處理,或者進行重試。 """ classProducer(object): def__init__(self, KafkaServerList=['127.0.0.1:9092'], ClientId="Procucer01", Topic='Test'): self._kwargs={ "bootstrap_servers": KafkaServerList, "client_id": ClientId, "acks":1, "buffer_memory":33554432, 'compression_type':None, "retries":3, "batch_size":1048576, "linger_ms":100, "key_serializer":lambdam: json.dumps(m).encode('utf-8'), "value_serializer":lambdam: json.dumps(m).encode('utf-8'), } self._topic=Topic try: self._producer=KafkaProducer(**self._kwargs) exceptException as err: print(err) def_onSendSucess(self, record_metadata): """ 非同步傳送成功回撥函式,也就是真正傳送到kafka叢集且成功才會執行。傳送到緩衝區不會執行回撥方法。 :param record_metadata: :return: """ print("傳送成功") print("被髮往的主題:", record_metadata.topic) print("被髮往的分割槽:", record_metadata.partition) print("佇列位置:", record_metadata.offset)# 這個偏移量是相對偏移量,也就是相對起止位置,也就是佇列偏移量。 def_onSendFailed(self): print("傳送失敗") defsendMessage(self, value=None, partition=None): ifnotvalue: returnNone # 傳送的訊息必須是序列化後的,或者是位元組 # message = json.dumps(msg, encoding='utf-8', ensure_ascii=False) kwargs={ "value": value,# value 必須必須為位元組或者被序列化為位元組,由於之前我們初始化時已經通過value_serializer來做了,所以我上面的語句就註釋了 "key":None,# 與value對應的鍵,可選,也就是把一個鍵關聯到這個訊息上,KEY相同就會把訊息傳送到同一分割槽上,所以如果有這個要求就可以設定KEY,也需要序列化 "partition": partition# 傳送到哪個分割槽,整型。如果不指定將會自動分配。 } try: # 非同步傳送,傳送到緩衝區,同時註冊兩個回撥函式,一個是傳送成功的回撥,一個是傳送失敗的回撥。 # send函式是有返回值的是RecordMetadata,也就是記錄的元資料,包括主題、分割槽、偏移量 future=self._producer.send(self._topic,**kwargs).add_callback(self._onSendSucess).add_errback(self._onSendFailed) print("傳送訊息:", value) # 註冊回撥也可以這樣寫,上面的寫法就是為了簡化 # future.add_callback(self._onSendSucess) # future.add_errback(self._onSendFailed) exceptKafkaTimeoutError as err: print(err) exceptException as err: print(err) defcloseConnection(self, timeout=None): # 關閉生產者,可以指定超時時間,也就是等待關閉成功最多等待多久。 self._producer.close(timeout=timeout) defsendNow(self, timeout=None): # 呼叫flush()函式可以放所有在緩衝區的訊息記錄立即傳送,即使ligner_ms值大於0. # 這時候後臺傳送訊息執行緒就會開始立即傳送訊息並且阻塞在這裡,等待訊息傳送成功,當然是否阻塞取決於acks的值。 # 如果不呼叫flush函式,那麼什麼時候傳送訊息取決於ligner_ms或者batch任意一個條件滿足就會發送。 try: self._producer.flush(timeout=timeout) exceptKafkaTimeoutError as err: print(err) exceptException as err: print(err) defmain(): p=Producer(KafkaServerList=["172.16.42.156:9092"], ClientId="Procucer01", Topic="TESTTOPIC") foriinrange(10): time.sleep(1) closePrice=random.randint(1,500) msg={ "Publisher":"Procucer01", "股票程式碼":60000+i "昨日收盤價": closePrice, "今日開盤價":0, "今日收盤價":0, } p.sendMessage(value=msg) # p.sendNow() p.closeConnection() if__name__=="__main__": try: main() finally: sys.exit()
單執行緒消費者(手動拉取訊息)
#!/usr/bin/env python # -*- coding: utf-8 -*- importsys fromkafkaimportKafkaConsumer importjson classConsumer(object): def__init__(self, KafkaServerList=['172.16.48.171:9092'], GroupID='TestGroup', ClientId="Test", Topics=['Test',]): """ 用於設定消費者配置資訊,這些配置項可以從原始碼中找到,下面為必要引數。 :param KafkaServerList: kafka伺服器IP:PORT 列表 :param GroupID: 消費者組ID :param ClientId: 消費者名稱 :param Topic: 主題 """ """ 初始化一個消費者例項,消費者不是執行緒安全的,所以建議一個執行緒實現一個消費者,而不是一個消費者讓多個執行緒共享 下面這些是可選引數,可以在初始化KafkaConsumer例項的時候傳遞進去 enable_auto_commit 是否自動提交,預設是true auto_commit_interval_ms 自動提交間隔毫秒數 auto_offset_reset="earliest" 重置偏移量,earliest移到最早的可用訊息,latest最新的訊息,預設為latest """ self._kwargs={ "bootstrap_servers": KafkaServerList, "client_id": ClientId, "group_id": GroupID, "enable_auto_commit":False, "auto_offset_reset":"latest", "key_deserializer":lambdam: json.loads(m.decode('utf-8')), "value_deserializer":lambdam: json.loads(m.decode('utf-8')), } try: self._consumer=KafkaConsumer(**self._kwargs) self._consumer.subscribe(topics=(Topics)) exceptException as err: print("Consumer init failed, %s"%err) defconsumeMsg(self): try: whileTrue: data=self._consumer.poll(timeout_ms=5, max_records=100)# 拉取訊息,字典型別 ifdata: forkeyindata: consumerrecord=data.get(key)[0]# 返回的是ConsumerRecord物件,可以通過字典的形式獲取內容。 ifconsumerrecord !=None: # 訊息消費邏輯 message={ "Topic": consumerrecord.topic, "Partition": consumerrecord.partition, "Offset": consumerrecord.offset, "Key": consumerrecord.key, "Value": consumerrecord.value } print(message) # 消費邏輯執行完畢後在提交偏移量 self._consumer.commit() else: print("%s consumerrecord is None."%key) exceptException as err: print(err) defmain(): try: c=Consumer(KafkaServerList=['192.168.51.193:9092'], Topics=['EEE888']) c.consumeMsg() exceptException as err: print(err) if__name__=="__main__": try: main() finally: sys.exit()
非手動拉取訊息
#!/usr/bin/env python # -*- coding: utf-8 -*- importsys fromkafkaimportKafkaConsumer importjson classConsumer(object): def__init__(self, KafkaServerList=['172.16.48.171:9092'], GroupID='TestGroup', ClientId="Test", Topics=['Test',]): """ 用於設定消費者配置資訊,這些配置項可以從原始碼中找到,下面為必要引數。 :param KafkaServerList: kafka伺服器IP:PORT 列表 :param GroupID: 消費者組ID :param ClientId: 消費者名稱 :param Topic: 主題 """ """ 初始化一個消費者例項,消費者不是執行緒安全的,所以建議一個執行緒實現一個消費者,而不是一個消費者讓多個執行緒共享 下面這些是可選引數,可以在初始化KafkaConsumer例項的時候傳遞進去 enable_auto_commit 是否自動提交,預設是true auto_commit_interval_ms 自動提交間隔毫秒數 auto_offset_reset="earliest" 重置偏移量,earliest移到最早的可用訊息,latest最新的訊息,預設為latest """ self._kwargs={ "bootstrap_servers": KafkaServerList, "client_id": ClientId, "group_id": GroupID, "enable_auto_commit":False, "auto_offset_reset":"latest", "key_deserializer":lambdam: json.loads(m.decode('utf-8')), "value_deserializer":lambdam: json.loads(m.decode('utf-8')), } try: self._consumer=KafkaConsumer(**self._kwargs) self._consumer.subscribe(topics=(Topics)) exceptException as err: print("Consumer init failed, %s"%err) defconsumeMsg(self): try: whileTrue: forconsumerrecordinself._consumer: ifconsumerrecord: message={ "Topic": consumerrecord.topic, "Partition": consumerrecord.partition, "Offset": consumerrecord.offset, "Key": consumerrecord.key, "Value": consumerrecord.value } print(message) self._consumer.commit() exceptException as err: print(err) defmain(): try: c=Consumer(KafkaServerList=['192.168.51.193:9092'], Topics=['EEE888']) c.consumeMsg() exceptException as err: print(err) if__name__=="__main__": try: main() finally: sys.exit()
Python API的幫助文件:https://kafka-python.readthedocs.io/en/master/usage.html