python kafka生產消費示例
阿新 • • 發佈:2018-12-23
# -*- coding: utf-8 -*-
'''''
使用kafka-Python 1.3.3模組
'''
import sys
import time
import json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
KAFAKA_HOST = "spider"
KAFAKA_PORT = 9092
KAFAKA_TOPIC = "test"
class Kafka_producer():
'''''
生產模組:根據不同的key,區分訊息
'''
def __init__(self, kafkahost,kafkaport, kafkatopic, key):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.key = key
print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)
bootstrap_servers = '{kafka_host}:{kafka_port}' .format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
)
print("boot svr:",bootstrap_servers)
self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers
)
def sendjsondata(self, params):
try:
parmas_message = json.dumps(params,ensure_ascii=False)
producer = self.producer
print(parmas_message)
v = parmas_message.encode('utf-8')
k = key.encode('utf-8')
print("send msg:(k,v)",k,v)
producer.send(self.kafkatopic, key=k, value= v)
producer.flush()
except KafkaError as e:
print (e)
class Kafka_consumer():
'''''
消費模組: 通過不同groupid消費topic裡面的訊息
'''
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.key = key
self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort )
)
def consume_data(self):
try:
for message in self.consumer:
yield message
except KeyboardInterrupt as e:
print (e)
def main(xtype, group, key):
'''''
測試consumer和producer
'''
if xtype == "p":
# 生產模組
producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
print ("===========> producer:", producer)
for _id in range(100):
params = '{"訊息" : "%s"}' % str(_id)
# 這種方式會將引號都打上\,
可以直接用python物件
params=[{"訊息0" :_id},{"訊息1" :_id}]
producer.sendjsondata(params)
time.sleep(1)
if xtype == 'c':
# 消費模組
consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
print ("===========> consumer:", consumer)
message = consumer.consume_data()
for msg in message:
print ('msg---------------->k,v', msg.key,msg.value)
print ('offset---------------->', msg.offset)
if __name__ == '__main__':
xtype = sys.argv[1]
group = sys.argv[2]
key = sys.argv[3]
main(xtype, group, key)