1. 程式人生 > >使用IO流模擬Kafka生產者生產資料

使用IO流模擬Kafka生產者生產資料

java程式碼使用IO流模擬生產者生產資料

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

public class
yuyiproducer02 { public static void main(String[] args) { final Properties prop = new Properties(); //broker 的地址 prop.put("metadata.broker.list","hadoop01:9092,hadoop02:9092,hadoop03:9092"); //設定序列化 prop.put("serializer.class","kafka.serializer.StringEncoder"); //
final ProducerConfig config = new ProducerConfig(prop); final Producer<String, String> producer = new Producer<>(config); //新建一個FileReader, 用來讀取資料 FileReader fis = null; // new 一個BufferReader, 用來讀取一行資料 BufferedReader bis = null; try
{ //讀取檔案的路徑 fis = new FileReader("D:\\order.log"); bis = new BufferedReader(fis); } catch (FileNotFoundException e) { e.printStackTrace(); } finally { } String str = ""; try { //每次讀取一行 while ((str=bis.readLine())!=null){ //topic 名 一行資料 final KeyedMessage<String, String> messg = new KeyedMessage<String, String>("orderDemo",str); System.out.println(str); try { Thread.sleep(2000); //每隔兩秒發一次 } catch (InterruptedException e) { e.printStackTrace(); } //生產資料 producer.send(messg); } } catch (IOException e) { e.printStackTrace(); } } }