1. 程式人生 > >Python # pykafka - Producer and Consume

Python # pykafka - Producer and Consume

nbsp syn balance off from span range hosts 0.10

###

python版本:2.7.13

pykafka版本:2.6.0

註明:python 3.6.2版本會報錯。

備註:這個是一個通過pykafka模塊向kafka生產數據

Github地址:https://github.com/Parsely/pykafka

Pykafka Doc:http://pykafka.readthedocs.io/en/latest/usage.html

producer.py

# -*- coding:utf-8 -*-
from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.0.100:9092")
# 可接受多個client

print client.topics
# 查看所有的topic

topic = client.topics[‘test‘]
# 選擇一個topic

message = "test message test message"
# 當有了topic之後呢,可以創建一個producer,來發消息,生產kafka數據,通過字符串形式
with topic.get_sync_producer() as producer:
    for i in range(4):
            producer.produce(‘test message ‘ + str(i ** 2))

###

consume.py

# -*- coding:utf-8 -*-
from pykafka import KafkaClient

client = KafkaClient(hosts=192.168.0.100:9092)

topic=client.topics[test]

balanced_consumer = topic.get_balanced_consumer(
    consumer_group=test_kafka_group,
    auto_commit_enable=False,
    # 設置為False的時候不需要添加consumer_group,直接連接topic即可取到消息
zookeeper_connect=192.168.0.100:2181#這裏可以連接多個zk ) for message in balanced_consumer: # print message if message is not None: print message.offset, message.value #打印接收到的消息體的偏移個數和值

###

Python # pykafka - Producer and Consume