1. 程式人生 > 程式設計 >python3連線kafka模組pykafka生產者簡單封裝程式碼

python3連線kafka模組pykafka生產者簡單封裝程式碼

1.1安裝模組

pip install pykafka

1.2基本使用

# -* coding:utf8 *- 
from pykafka import KafkaClient 
host = 'IP:9092,IP:9092,IP:9092'
client = KafkaClient(hosts = host) 
# 生產者 
topicdocu = client.topics['my-topic'] 
producer = topicdocu.get_producer() 
for i in range(100): 
 print i 
 producer.produce('test message ' + str(i ** 2)) 
producer.stop()

1.3簡單封裝

class KafkaProduct():

 def __init__(self,hosts,topic):
  """
  初始化例項
  :param hosts: 連線地址
  :param topic:
  """
  self.__client = KafkaClient(hosts=hosts)
  self.__topic = self.__client.topics[topic.encode()]

 def __set_topic(self,topic):
  self.__topic = self.__client.topics[topic.encode()]

 def set_topic(self,topic):
  """
  設定topic
  :param topic:
  :return:
  """
  self.__set_topic(topic)

 def get_topics(self):
  """
  獲取當前所有topic
  :return:
  """
  return self.__client.topics

 def get_topic(self):
  """
  獲取當前topic
  :return:
  """
  return self.__topic

 def Producer(self):
  """
  生產者物件
  :return:
  """
  with self.__topic.get_producer(delivery_reports=True) as producer:
   next_data = ''
   while True:
    if next_data:
     producer.produce(str(next_data).encode())
    next_data = yield True

 def send_data(self,datas):
  """
  傳送資料
  :param datas:需要傳入的可迭代物件
  :return:
  """
  c = self.Producer()
  next(c)
  for i in datas:
   c.send(i)

if __name__ == '__main__':

hosts = "1.2.3.4:9999,2.3.4.5:9090" #連線hosts
topic = "test_523"
K = KafkaProduct(hosts=hosts,topic=topic) #
#K.set_topic("test") #切換設定新的topic
K.get_topic() #獲取當前設定的topic
#K.get_topics() #獲取所有topic
data = range(10000) #要傳送的可迭代物件
K.send_data(data)

以上這篇python3連線kafka模組pykafka生產者簡單封裝程式碼就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。