kafka與flume的連線、kafka的python開發
kafka與flume的連線:
修改flume的配置檔案,將sinks的配置修改為:
看flume的官網:
http://flume.apache.org/FlumeUserGuide.html#kafka-sink
#定義gent的source channel sinks
a5.sources = sr1
a5.channels = ch1
a5.sinks = log1
#監聽目錄
a5.sources.sr1.type=spooldir
a5.sources.sr1.spoolDir=/home/hadoop/hahaha_test1
#設定channel的引數
a5.channels.ch1.type=memory
#設定sink引數
a5.sinks.log1.type = org.apache.flume.sink.kafka.KafkaSink
a5.sinks.log1.kafka.topic = cctv3 #修改
a5.sinks.log1.kafka.bootstrap.servers = localhost:9092
a5.sinks.log1.kafka.flumeBatchSize = 20
a5.sinks.log1.kafka.producer.acks = 1
#連線sink和source
a5.sources.sr1.channels=ch1
a5.sinks.log1.channel=ch1
啟動flume。 連線上kafka
2.kafka的python開發
pycharm和linux都安裝kafka pip install kafka
用pycharm編寫生產者:
from kafka import KafkaProducer producer=KafkaProducer(bootstrap_servers='localhost:9092') while True: msg=input('輸入訊息或bye推出') if'bye'==msg: print('歡迎使用') break producer.send('cctv3',msg.encode()) producer.flush()
編寫消費者:
from kafka import KafkaConsumer consumer=KafkaConsumer('cctv3',bootstrap_servers='localhost:9092') for msg in consumer: print(msg.value.decode())
用xftp當作橋樑將py檔案傳到hadoop下:
執行python3 producerdemo命令 啟動生產者 同樣啟動消費者
python也可以直接連線資料庫,直接調取資料,分析