1. 程式人生 > >Kafka生產者消費者例項

Kafka生產者消費者例項

主要實現Kafka消費者和生產者最基礎功能。

消費者例項:

public class MyKafkaConsumer implements Runnable {
private String topic;
public MyKafkaConsumer(String topic) {
super();
this.topic = topic;
}
//載入kafka配置資訊
public Properties createKafkaProperties() {
InputStream in = MyKafkaConsumer.class.getResourceAsStream("/kafka.properties");
Properties prop = new Properties();
try {
prop.load(in);
} catch (IOException e) {
e.printStackTrace();
} finally {
if(in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return prop;
}
@Override
public void run() {
//獲取kafka連線
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(createKafkaProperties()));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[],byte[]> stream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[],byte[]> iterator = stream.iterator();
while(iterator.hasNext()) {
String message = new String(iterator.next().message());
System.out.println("接收到: " + message);
}

}
public static void main(String[] args) {
new Thread(new MyKafkaConsumer("test")).start();
}
}

生產者例項:

public class MyKafkaProducer implements Runnable {
private String topic;
private LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
public MyKafkaProducer(String topic,LinkedBlockingQueue<String> queue) {
super();
this.topic = topic;
this.queue = queue;
}
public Properties createProperties() {
InputStream in = MyKafkaProducer.class.getResourceAsStream("/kafkaProducer");
Properties prop = new Properties();
try {
prop.load(in);
} catch (IOException e) {
e.printStackTrace();
} finally {
if(in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return prop;
}

@Override
public void run() {
Producer<String, String> producer = new Producer<>(new ProducerConfig(createProperties()));

while(true) {

try {

//此處可以為傳送的訊息指定Key和Value,進而能夠根據指定key進行相應分割槽

producer.send(new KeyedMessage<String, String>(topic, queue.take()));
} catch (InterruptedException e) {
e.printStackTrace();
}

}

}

}

注:新增prop.put("serializer.class", StringEncoder.class.getName());語句,否則報序列化錯誤