Kafka之--python-kafka測試kafka叢集的生產者與消費者
阿新 • • 發佈:2021-07-15
前面兩篇部落格已經完成了Kafka的搭建,今天再來點稍高難度的帖子。
測試一下kafka的訊息消費行為。雖然,kafka有測試的shell指令碼可以直接測試,但既然我最近在玩python,那還是用python程式來做個測試。
1)首先需要kafka-python安裝包。
這個包依賴的是python 3以上的版本,但是linux7預設都是安裝2.7版本。
當然,你也可以在linux下安裝一個3.x版本,但是如此又會需要調整yum(依賴python 2.7)等一大堆東西。
還有我習慣於windows下除錯程式,那我何不在windows下安裝3.X版本。用它來訪問我的linux虛擬機器呢?
說幹就幹,首先安裝pycharm,再安裝python 3.7。
https://www.python.org/ftp/python/3.7.2/python-3.7.2.exe
安裝完後,可以直接用CMD>pip installkafka-python
C:\Users\Lenovo>pip install kafka-python WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) Requirement already satisfied: kafka-python in d:\programs\python\python37\lib\site-packages (2.0.2) WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) C:\Users\Lenovo>pip list kafka WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) Package Version ---------------------- ------- dnspython 1.16.0 kafka-python 2.0.2 mysql-connector-python 8.0.19 pip 21.1.3 psutil 5.8.0 pygame 1.9.4 setuptools 41.2.0 WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages) WARNING: Ignoring invalid distribution -ip (d:\programs\python\python37\lib\site-packages)
2)pycharm新建python專案,指定本地python環境(不要用預設的虛擬環境)
建立2個Python檔案,producer.py & consumer.py
Producer.py
#!usr/bin/python import json import time from kafka import KafkaProducer from kafka.errors import KafkaError, KafkaTimeoutError topic = 'mytopic2' producer = KafkaProducer(bootstrap_servers="192.168.56.151:9092,192.168.56.152:9092,192.168.56.153:9092") msg_dict = { 'method':"post", 'header':'json', 'content':"17909", } for i in range(100,1200): si= str(i); msg_dict['method'] = "post " + si; msg_dict['content']=si; msg = json.dumps(msg_dict).encode(encoding='utf-8') print(msg) try: future = producer.send(topic, msg) result = future.get(timeout=10) print(result) time.sleep(1) except KafkaTimeoutError as err: print(err) except Exception as err: print(err) producer.close()
Consumer.py
#!usr/bin/python from kafka import KafkaConsumer topic = 'mytopic2' consumer = KafkaConsumer(topic, bootstrap_servers=['192.168.56.151:9092','192.168.56.152:9092','192.168.56.153:9092'],group_id='mygroup2') for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print (recv)
3)執行過程
Producer每秒鐘產生一個訊息。
D:\Programs\Python\Python37\python.exe C:/Users/Lenovo/PycharmProjects/pythonProjectKafka2/Producer.py b'{"method": "post 100", "header": "json", "content": "100"}' RecordMetadata(topic='mytopic2', partition=1, topic_partition=TopicPartition(topic='mytopic2', partition=1), offset=0, timestamp=1626272823148, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=58, serialized_header_size=-1) b'{"method": "post 101", "header": "json", "content": "101"}' RecordMetadata(topic='mytopic2', partition=0, topic_partition=TopicPartition(topic='mytopic2', partition=0), offset=0, timestamp=1626272824211, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=58, serialized_header_size=-1) b'{"method": "post 102", "header": "json", "content": "102"}'
Consumer接收訊息。
當我只開始一個Consumer的時候,它可以接收到所有partition(我配置的parition=3)的訊息。
當再啟動2個Consumer的時候,可以看到每個consumer都只會消費某一個partition的資料了。
說明: mytopic2:1:1對應的是 (Topicname: parition-id: offset ),具體你看程式碼就清楚了。
D:\Programs\Python\Python37\python.exe C:/Users/Lenovo/PycharmProjects/pythonProjectKafka2/Consumer.py mytopic2:1:1: key=None value=b'{"method": "post 103", "header": "json", "content": "103"}' mytopic2:1:2: key=None value=b'{"method": "post 104", "header": "json", "content": "104"}' mytopic2:1:3: key=None value=b'{"method": "post 105", "header": "json", "content": "105"}' mytopic2:2:0: key=None value=b'{"method": "post 106", "header": "json", "content": "106"}' mytopic2:1:4: key=None value=b'{"method": "post 107", "header": "json", "content": "107"}' mytopic2:2:1: key=None value=b'{"method": "post 108", "header": "json", "content": "108"}' mytopic2:0:2: key=None value=b'{"method": "post 109", "header": "json", "content": "109"}' mytopic2:0:3: key=None value=b'{"method": "post 110", "header": "json", "content": "110"}' mytopic2:1:5: key=None value=b'{"method": "post 111", "header": "json", "content": "111"}' mytopic2:1:6: key=None value=b'{"method": "post 112", "header": "json", "content": "112"}' mytopic2:0:4: key=None value=b'{"method": "post 114", "header": "json", "content": "114"}' mytopic2:0:5: key=None value=b'{"method": "post 115", "header": "json", "content": "115"}' mytopic2:1:7: key=None value=b'{"method": "post 116", "header": "json", "content": "116"}' mytopic2:1:8: key=None value=b'{"method": "post 117", "header": "json", "content": "117"}' mytopic2:0:6: key=None value=b'{"method": "post 118", "header": "json", "content": "118"}' mytopic2:0:7: key=None value=b'{"method": "post 120", "header": "json", "content": "120"}' mytopic2:0:8: key=None value=b'{"method": "post 121", "header": "json", "content": "121"}' mytopic2:1:9: key=None value=b'{"method": "post 122", "header": "json", "content": "122"}' mytopic2:1:10: key=None value=b'{"method": "post 123", "header": "json", "content": "123"}' mytopic2:0:9: key=None value=b'{"method": "post 126", "header": "json", "content": "126"}' mytopic2:0:10: key=None value=b'{"method": "post 127", "header": "json", "content": "127"}' mytopic2:0:11: key=None value=b'{"method": "post 129", "header": "json", "content": "129"}' mytopic2:0:12: key=None value=b'{"method": "post 130", "header": "json", "content": "130"}' mytopic2:0:13: key=None value=b'{"method": "post 137", "header": "json", "content": "137"}' mytopic2:0:14: key=None value=b'{"method": "post 138", "header": "json", "content": "138"}' mytopic2:0:15: key=None value=b'{"method": "post 139", "header": "json", "content": "139"}' mytopic2:0:16: key=None value=b'{"method": "post 140", "header": "json", "content": "140"}' Process finished with exit code -1
C:\Users\Lenovo\PycharmProjects\pythonProjectKafka2>python Consumer.py mytopic2:2:2: key=None value=b'{"method": "post 113", "header": "json", "content": "113"}' mytopic2:2:3: key=None value=b'{"method": "post 119", "header": "json", "content": "119"}' mytopic2:2:4: key=None value=b'{"method": "post 124", "header": "json", "content": "124"}' mytopic2:2:5: key=None value=b'{"method": "post 125", "header": "json", "content": "125"}' mytopic2:1:11: key=None value=b'{"method": "post 131", "header": "json", "content": "131"}' mytopic2:1:12: key=None value=b'{"method": "post 134", "header": "json", "content": "134"}' mytopic2:1:13: key=None value=b'{"method": "post 144", "header": "json", "content": "144"}' mytopic2:1:14: key=None value=b'{"method": "post 146", "header": "json", "content": "146"}'
C:\Users\Lenovo\PycharmProjects\pythonProjectKafka2>python Consumer.py mytopic2:2:6: key=None value=b'{"method": "post 128", "header": "json", "content": "128"}' mytopic2:2:7: key=None value=b'{"method": "post 132", "header": "json", "content": "132"}' mytopic2:2:8: key=None value=b'{"method": "post 133", "header": "json", "content": "133"}' mytopic2:2:9: key=None value=b'{"method": "post 135", "header": "json", "content": "135"}' mytopic2:2:10: key=None value=b'{"method": "post 136", "header": "json", "content": "136"}' mytopic2:2:11: key=None value=b'{"method": "post 141", "header": "json", "content": "141"}' mytopic2:2:12: key=None value=b'{"method": "post 142", "header": "json", "content": "142"}' mytopic2:2:13: key=None value=b'{"method": "post 143", "header": "json", "content": "143"}'