使用IO流模擬Kafka生產者生產資料
阿新 • • 發佈:2018-12-21
java程式碼使用IO流模擬生產者生產資料
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
public class yuyiproducer02 {
public static void main(String[] args) {
final Properties prop = new Properties();
//broker 的地址
prop.put("metadata.broker.list","hadoop01:9092,hadoop02:9092,hadoop03:9092");
//設定序列化
prop.put("serializer.class","kafka.serializer.StringEncoder");
//
final ProducerConfig config = new ProducerConfig(prop);
final Producer<String, String> producer = new Producer<>(config);
//新建一個FileReader, 用來讀取資料
FileReader fis = null;
// new 一個BufferReader, 用來讀取一行資料
BufferedReader bis = null;
try {
//讀取檔案的路徑
fis = new FileReader("D:\\order.log");
bis = new BufferedReader(fis);
} catch (FileNotFoundException e) {
e.printStackTrace();
} finally {
}
String str = "";
try {
//每次讀取一行
while ((str=bis.readLine())!=null){
//topic 名 一行資料
final KeyedMessage<String, String> messg = new KeyedMessage<String, String>("orderDemo",str);
System.out.println(str);
try {
Thread.sleep(2000); //每隔兩秒發一次
} catch (InterruptedException e) {
e.printStackTrace();
}
//生產資料
producer.send(messg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}