1. 程式人生 > 其它 >Kafka之--python-kafka測試kafka叢集的生產者與消費者

Kafka之--python-kafka測試kafka叢集的生產者與消費者

前面兩篇部落格已經完成了Kafka的搭建,今天再來點稍高難度的帖子。

測試一下kafka的訊息消費行為。雖然,kafka有測試的shell指令碼可以直接測試,但既然我最近在玩python,那還是用python程式來做個測試。

1)首先需要kafka-python安裝包。

這個包依賴的是python 3以上的版本,但是linux7預設都是安裝2.7版本。

當然,你也可以在linux下安裝一個3.x版本,但是如此又會需要調整yum(依賴python 2.7)等一大堆東西。

還有我習慣於windows下除錯程式,那我何不在windows下安裝3.X版本。用它來訪問我的linux虛擬機器呢?

說幹就幹,首先安裝pycharm,再安裝python 3.7。

https://www.python.org/ftp/python/3.7.2/python-3.7.2.exe

安裝完後,可以直接用CMD>pip installkafka-python

C:\Users\Lenovo>pip install kafka-python
WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages)
WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages)
Requirement already satisfied: kafka
-python in d:\programs\python\python37\lib\site-packages (2.0.2) WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) C:\Users\Lenovo
>pip list kafka WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) Package Version ---------------------- ------- dnspython 1.16.0 kafka-python 2.0.2 mysql-connector-python 8.0.19 pip 21.1.3 psutil 5.8.0 pygame 1.9.4 setuptools 41.2.0 WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages)

2)pycharm新建python專案,指定本地python環境(不要用預設的虛擬環境)

建立2個Python檔案,producer.py & consumer.py

Producer.py

#!usr/bin/python
import json
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError

topic = 'mytopic2'
producer = KafkaProducer(bootstrap_servers="192.168.56.151:9092,192.168.56.152:9092,192.168.56.153:9092")
msg_dict = {
            'method':"post",
            'header':'json',
            'content':"17909",
 }


for i in range(100,1200):
    si= str(i);
    msg_dict['method'] = "post " + si;
    msg_dict['content']=si;
    msg = json.dumps(msg_dict).encode(encoding='utf-8')
    print(msg)
    try:
        future = producer.send(topic, msg)
        result = future.get(timeout=10)
        print(result)
        time.sleep(1)
    except KafkaTimeoutError as err:
        print(err)
    except Exception as err:
        print(err)

producer.close()

Consumer.py

#!usr/bin/python
from kafka import KafkaConsumer
topic = 'mytopic2'
consumer = KafkaConsumer(topic, bootstrap_servers=['192.168.56.151:9092','192.168.56.152:9092','192.168.56.153:9092'],group_id='mygroup2')
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print (recv)

3)執行過程

Producer每秒鐘產生一個訊息。

D:\Programs\Python\Python37\python.exe C:/Users/Lenovo/PycharmProjects/pythonProjectKafka2/Producer.py
b'{"method": "post 100", "header": "json", "content": "100"}'
RecordMetadata(topic='mytopic2', partition=1, topic_partition=TopicPartition(topic='mytopic2', partition=1), offset=0, timestamp=1626272823148, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=58, serialized_header_size=-1)
b'{"method": "post 101", "header": "json", "content": "101"}'
RecordMetadata(topic='mytopic2', partition=0, topic_partition=TopicPartition(topic='mytopic2', partition=0), offset=0, timestamp=1626272824211, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=58, serialized_header_size=-1)
b'{"method": "post 102", "header": "json", "content": "102"}'

Consumer接收訊息。

當我只開始一個Consumer的時候,它可以接收到所有partition(我配置的parition=3)的訊息。

當再啟動2個Consumer的時候,可以看到每個consumer都只會消費某一個partition的資料了。

說明: mytopic2:1:1對應的是 (Topicname: parition-id: offset ),具體你看程式碼就清楚了。

D:\Programs\Python\Python37\python.exe C:/Users/Lenovo/PycharmProjects/pythonProjectKafka2/Consumer.py
mytopic2:1:1: key=None value=b'{"method": "post 103", "header": "json", "content": "103"}'
mytopic2:1:2: key=None value=b'{"method": "post 104", "header": "json", "content": "104"}'
mytopic2:1:3: key=None value=b'{"method": "post 105", "header": "json", "content": "105"}'
mytopic2:2:0: key=None value=b'{"method": "post 106", "header": "json", "content": "106"}'
mytopic2:1:4: key=None value=b'{"method": "post 107", "header": "json", "content": "107"}'
mytopic2:2:1: key=None value=b'{"method": "post 108", "header": "json", "content": "108"}'
mytopic2:0:2: key=None value=b'{"method": "post 109", "header": "json", "content": "109"}'
mytopic2:0:3: key=None value=b'{"method": "post 110", "header": "json", "content": "110"}'
mytopic2:1:5: key=None value=b'{"method": "post 111", "header": "json", "content": "111"}'
mytopic2:1:6: key=None value=b'{"method": "post 112", "header": "json", "content": "112"}'
mytopic2:0:4: key=None value=b'{"method": "post 114", "header": "json", "content": "114"}'
mytopic2:0:5: key=None value=b'{"method": "post 115", "header": "json", "content": "115"}'
mytopic2:1:7: key=None value=b'{"method": "post 116", "header": "json", "content": "116"}'
mytopic2:1:8: key=None value=b'{"method": "post 117", "header": "json", "content": "117"}'
mytopic2:0:6: key=None value=b'{"method": "post 118", "header": "json", "content": "118"}'
mytopic2:0:7: key=None value=b'{"method": "post 120", "header": "json", "content": "120"}'
mytopic2:0:8: key=None value=b'{"method": "post 121", "header": "json", "content": "121"}'
mytopic2:1:9: key=None value=b'{"method": "post 122", "header": "json", "content": "122"}'
mytopic2:1:10: key=None value=b'{"method": "post 123", "header": "json", "content": "123"}'
mytopic2:0:9: key=None value=b'{"method": "post 126", "header": "json", "content": "126"}'
mytopic2:0:10: key=None value=b'{"method": "post 127", "header": "json", "content": "127"}'
mytopic2:0:11: key=None value=b'{"method": "post 129", "header": "json", "content": "129"}'
mytopic2:0:12: key=None value=b'{"method": "post 130", "header": "json", "content": "130"}'
mytopic2:0:13: key=None value=b'{"method": "post 137", "header": "json", "content": "137"}'
mytopic2:0:14: key=None value=b'{"method": "post 138", "header": "json", "content": "138"}'
mytopic2:0:15: key=None value=b'{"method": "post 139", "header": "json", "content": "139"}'
mytopic2:0:16: key=None value=b'{"method": "post 140", "header": "json", "content": "140"}'

Process finished with exit code -1
C:\Users\Lenovo\PycharmProjects\pythonProjectKafka2>python Consumer.py
mytopic2:2:2: key=None value=b'{"method": "post 113", "header": "json", "content": "113"}'
mytopic2:2:3: key=None value=b'{"method": "post 119", "header": "json", "content": "119"}'
mytopic2:2:4: key=None value=b'{"method": "post 124", "header": "json", "content": "124"}'
mytopic2:2:5: key=None value=b'{"method": "post 125", "header": "json", "content": "125"}'
mytopic2:1:11: key=None value=b'{"method": "post 131", "header": "json", "content": "131"}'
mytopic2:1:12: key=None value=b'{"method": "post 134", "header": "json", "content": "134"}'
mytopic2:1:13: key=None value=b'{"method": "post 144", "header": "json", "content": "144"}'
mytopic2:1:14: key=None value=b'{"method": "post 146", "header": "json", "content": "146"}'
C:\Users\Lenovo\PycharmProjects\pythonProjectKafka2>python Consumer.py
mytopic2:2:6: key=None value=b'{"method": "post 128", "header": "json", "content": "128"}'
mytopic2:2:7: key=None value=b'{"method": "post 132", "header": "json", "content": "132"}'
mytopic2:2:8: key=None value=b'{"method": "post 133", "header": "json", "content": "133"}'
mytopic2:2:9: key=None value=b'{"method": "post 135", "header": "json", "content": "135"}'
mytopic2:2:10: key=None value=b'{"method": "post 136", "header": "json", "content": "136"}'
mytopic2:2:11: key=None value=b'{"method": "post 141", "header": "json", "content": "141"}'
mytopic2:2:12: key=None value=b'{"method": "post 142", "header": "json", "content": "142"}'
mytopic2:2:13: key=None value=b'{"method": "post 143", "header": "json", "content": "143"}'