Python # pykafka - Producer and Consume
阿新 • • 發佈:2017-09-03
nbsp syn balance off from span range hosts 0.10
###
python版本:2.7.13
pykafka版本:2.6.0
註明:python 3.6.2版本會報錯。
備註:這個是一個通過pykafka模塊向kafka生產數據
Github地址:https://github.com/Parsely/pykafka
Pykafka Doc:http://pykafka.readthedocs.io/en/latest/usage.html
producer.py
# -*- coding:utf-8 -*- from pykafka import KafkaClient client = KafkaClient(hosts="192.168.0.100:9092") # 可接受多個client print client.topics # 查看所有的topic topic = client.topics[‘test‘] # 選擇一個topic message = "test message test message" # 當有了topic之後呢,可以創建一個producer,來發消息,生產kafka數據,通過字符串形式 with topic.get_sync_producer() as producer: for i in range(4): producer.produce(‘test message ‘ + str(i ** 2))
###
consume.py
# -*- coding:utf-8 -*- from pykafka import KafkaClient client = KafkaClient(hosts=‘192.168.0.100:9092‘) topic=client.topics[‘test‘] balanced_consumer = topic.get_balanced_consumer( consumer_group=‘test_kafka_group‘, auto_commit_enable=False, # 設置為False的時候不需要添加consumer_group,直接連接topic即可取到消息zookeeper_connect=‘192.168.0.100:2181‘#這裏可以連接多個zk ) for message in balanced_consumer: # print message if message is not None: print message.offset, message.value #打印接收到的消息體的偏移個數和值
###
Python # pykafka - Producer and Consume