Kafka Producer(Python threading)
阿新 • • 發佈:2017-07-31
bootstra tar () imp bootstrap pla oot produce init
import threading
import time
import random
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=‘192.168.1.10:9092‘)
threads = []
class MyThread(threading.Thread):
def __init__(self, threadName, delay):
threading.Thread.__init__(self)
self.threadName=threadName
self.delay=delay
def run(self):
sendinfo(self.threadName, self.delay)
def sendinfo( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
data = "".join(random.sample(
[‘a‘, ‘b‘, ‘c‘, ‘d‘, ‘e‘, ‘f‘, ‘g‘, ‘h‘, ‘i‘, ‘j‘, ‘k‘, ‘l‘, ‘m‘, ‘n‘, ‘o‘, ‘p‘, ‘q‘, ‘r‘, ‘s‘, ‘t‘, ‘u‘, ‘v‘,
‘w‘, ‘x‘, ‘y‘, ‘z‘], 10)).replace(" ", "")
word=("%s, %s, %s, %s" % (threadName, count, data, time.ctime(time.time())))
producer.send(‘test‘, key=threadName, value=word)
print (word)
try:
t1=MyThread("Thread-1",0)
threads.append(t1)
t2=MyThread("Thread-2",0)
threads.append(t2)
t3=MyThread("Thread-3",0)
threads.append(t3)
for t in threads:
t.start()
for t in threads:
t.join()
producer.send(‘test‘, key="Thread-1", value="exit")
producer.send(‘test‘, key="Thread-2", value="exit")
producer.send(‘test‘, key="Thread-3", value="exit")
print ("exit program with 0")
except:
print ("Error: failed to run producer program")
Kafka Producer(Python threading)