1. 程式人生 > >kafka與flume的連線、kafka的python開發

kafka與flume的連線、kafka的python開發

kafka與flume的連線:

修改flume的配置檔案,將sinks的配置修改為:

看flume的官網:

http://flume.apache.org/FlumeUserGuide.html#kafka-sink


#定義gent的source channel sinks
a5.sources = sr1
a5.channels = ch1
a5.sinks = log1
#監聽目錄
a5.sources.sr1.type=spooldir
a5.sources.sr1.spoolDir=/home/hadoop/hahaha_test1
#設定channel的引數
a5.channels.ch1.type=memory
#設定sink引數
a5.sinks.log1.type = org.apache.flume.sink.kafka.KafkaSink
a5.sinks.log1.kafka.topic = cctv3      #修改
a5.sinks.log1.kafka.bootstrap.servers = localhost:9092
a5.sinks.log1.kafka.flumeBatchSize = 20
a5.sinks.log1.kafka.producer.acks = 1
#連線sink和source
a5.sources.sr1.channels=ch1

a5.sinks.log1.channel=ch1


啟動flume。  連線上kafka


2.kafka的python開發

pycharm和linux都安裝kafka        pip install kafka

用pycharm編寫生產者:

from kafka import  KafkaProducer
producer=KafkaProducer(bootstrap_servers='localhost:9092')
while True:
    msg=input('輸入訊息或bye推出')
    if  
'bye'==msg: print('歡迎使用') break producer.send('cctv3',msg.encode()) producer.flush()

編寫消費者:

from kafka import  KafkaConsumer
consumer=KafkaConsumer('cctv3',bootstrap_servers='localhost:9092')
for msg in consumer:
    print(msg.value.decode())

用xftp當作橋樑將py檔案傳到hadoop下:

執行python3 producerdemo命令  啟動生產者      同樣啟動消費者

python也可以直接連線資料庫,直接調取資料,分析