Kafka讀取本地檔案作為生產者
阿新 • • 發佈:2018-11-17
package com.qf.utils;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import java.io.*;
import java.util.Properties;
public class CollectLog {
public static void main(String[] args){
Properties properties = new Properties();
properties.setProperty("metadata.broker.list",
"mini4:9092,mini5:9092,mini6:9092");
//訊息傳遞到broker時的序列化方式
properties.setProperty("serializer.class",StringEncoder.class.getName());
//zk的地址
properties.setProperty("zookeeper.connect",
"mini4:2181,mini5:2181,mini6:2181");
//是否反饋訊息 0是不反饋訊息 1是反饋訊息
properties.setProperty("request.required.acks","1");
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer<String,String> producer = new Producer<String,String>(producerConfig);
try {
BufferedReader bf = new BufferedReader(
new FileReader(
new File(
"D:\\qf大資料\\spark\\day13_專案\\考試i\\JsonTest.json")));
String line = null;
while((line=bf.readLine())!=null){
KeyedMessage<String,String> keyedMessage = new KeyedMessage<String,String>("JsonData3",line);
Thread.sleep(5000);
producer.send(keyedMessage);
}
bf.close();
producer.close();
System.out.println("已經發送完畢");
} catch (Exception e) {
e.printStackTrace();
}
}
}