1. 程式人生 > 程式設計 >python3 互動操作 kafka 之 kafka python

python3 互動操作 kafka 之 kafka python

Github地址

github.com/dpkp/kafka-…

kafka-python庫的官網

pypi.org/project/kaf…

kafka-python官網檔案

kafka-python.readthedocs.io/en/master/

使用pip3安裝kafka-python

在閱讀kafka-python檔案會說明很多安裝的方式,這裡採用pip3的安裝方式。 pip3 install kafka-python

D:\pythonProject\kafka_test>pip3 install kafka-python
Collecting kafka-python
  Downloading https://files.pythonhosted.org/packages/82/39/aebe3ad518513bbb2260dd84ac21e5c30af860cc4c95b32acbd64b9d9d0d/kafka_python-1.4.6-py2.py3-none-any.whl (259kB)
    100% |████████████████████████████████| 266kB 123kB/s
Installing collected packages: kafka-python
Successfully installed kafka-python-1.4.6
You are using pip version 18.1,however version 19.1.1 is available.
You should consider upgrading via the 'python -m pip install --upgrade pip'
command. D:\pythonProject\kafka_test> 複製程式碼

好了,這樣就安裝完成。下面就根據檔案示例執行一下。 當然,這個執行之前首先要安裝好kafka的環境。

先別急著操作,先來看看這個kafka-python庫客戶端的相關說明。

Kafka Python客戶端

用於Apache Kafka分散式流處理系統的Python客戶端。kafka-python的功能與官方java客戶端非常相似,帶有多個pythonic介面(例如,消費者迭代器)。

kafka-python最適用於較新的代理broker(0.9+),但與舊版本(向0.8.0)向後相容。某些功能僅在較新的代理上啟用。例如,完全協調的消費者群體 - 如果向同一群體中的多個消費者分配動態分割槽 - 需要使用0.9+ kafka broker。為早期的代理髮布支援此功能需要編寫和維護自定義領導選舉和成員/健康檢查程式碼(可能使用zookeeper或consul)。對於較舊的代理,您可以通過使用諸如chef,ansible等配置管理工具為每個消費者例項手動分配不同的分割槽來實現類似的功能。這種方法可以正常工作,但它不支援故障時的重新平衡。見<

kafka-python.readthedocs.io/en/master/c… >瞭解更多詳情。

請注意,主分支可能包含未釋出的功能。有關釋出文件,請參閱readthedocs和/或python的內聯幫助。

>>> pip install kafka-python
複製程式碼

看了上面的說明之後,心裡大概有了一些概念了,下面來進行一下生產者和消費者的呼叫示例看看。

注意:在開始呼叫之前,首先要配置好kafka的遠端呼叫,避免除錯老是報錯的坑。 如果不清楚kafka如何配置遠端呼叫,可以訪問這裡

KafkaProducer

二話不說,直接按照官方檔案寫出一個示例,如下:

from kafka import KafkaProducer
from time import sleep

def start_producer():
    producer = KafkaProducer(bootstrap_servers='192.168.196.129:9092'
) for i in range(0,100000): msg = 'msg is ' + str(i) producer.send('my_favorite_topic2',msg.encode('utf-8')) sleep(3) if __name__ == '__main__': start_producer() 複製程式碼

執行啟動服務如下:

執行起來之後,生產者迴圈傳送訊息給kafka,這裡我沒有列印返回的結果。下面來看看消費者端是怎麼處理的。

KafkaConsumer

上面的程式我一直執行生產者不斷髮送訊息,下面我這邊就執行開啟消費者接收最新的訊息。

from kafka import KafkaConsumer
import time

def start_consumer():
    consumer = KafkaConsumer('my_favorite_topic2',bootstrap_servers = '192.168.196.129:9092')
    for msg in consumer:
        print(msg)
        print("topic = %s" % msg.topic) # topic default is string
        print("partition = %d" % msg.offset)
        print("value = %s" % msg.value.decode()) # bytes to string
        print("timestamp = %d" % msg.timestamp)
        print("time = ",time.strftime("%Y-%m-%d %H:%M:%S",time.localtime( msg.timestamp/1000 )) )

if __name__ == '__main__':
    start_consumer()
複製程式碼

執行如下:

從上圖可以看到,消費者通過迴圈就可以不斷接收訊息進行處理,另外我還對訊息的內容進行了相關的拆分解析。