Kafka的單執行緒生產消費測試
阿新 • • 發佈:2018-11-03
程式碼:
package com.weichai.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * Kafka生產者的簡單示例 * @author lhy * @date 2018.10.09 */ public class SimpleProducer { public static void main(String[] args) { // TODO Auto-generated method stub Properties props = new Properties(); props.setProperty("metadata.broker.list", "localhost:9092"); // 設定kafka的埠為預設埠9020 props.setProperty("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); //建立生產者物件 Producer<String, String> producer = new Producer<String, String>(config); //生成訊息 KeyedMessage<String, String> data = new KeyedMessage<String, String>("SimpleNode", "Kafka Simple Test"); int i = 1; try { while (i<100){ // 傳送訊息 producer.send(data); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.close(); } }
package com.weichai.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; /** * 消費者簡單測試(單執行緒獲取消費資料) * @author lhy * @date 2018.10.09 */ public class SimpleConsumer extends Thread{ //消費者連線 private final ConsumerConnector consumer; // 要消費的話題 private final String topic; public SimpleConsumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } //配置相關資訊 private static ConsumerConfig createConsumerConfig() { // TODO Auto-generated method stub Properties props = new Properties(); // props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181"); // 配置要連線的zookeeper地址與埠 props.put("zookeeper.connect", "localhost:2181"); // 配置zookeeper的組id (The ‘group.id’ string defines the Consumer Group // this process is consuming on behalf of.) props.put("group.id", "0"); // 配置zookeeper連線超時間隔 props.put("zookeeper.session.timeout.ms", "10000"); // The ‘zookeeper.sync.time.ms’ is the number of milliseconds a // ZooKeeper ‘follower’ can be behind the master before an error occurs. props.put("zookeeper.sync.time.ms", "200"); // The ‘auto.commit.interval.ms’ setting is how often updates to the // consumed offsets are written to ZooKeeper. // Note that since the commit frequency is time based instead of # of // messages consumed, if an error occurs between updates to ZooKeeper on // restart you will get replayed messages. props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run(){ Map<String, Integer> topickMap = new HashMap<String, Integer>(); topickMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap); KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); System.out.println("*********Kafka消費者結果********"); while(true){ if(it.hasNext()){ //列印得到的訊息 System.err.println(Thread.currentThread() + " get kafka data:" + new String(it.next().message())); } try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static void main(String[] args) { // TODO Auto-generated method stub SimpleConsumer consumerThread = new SimpleConsumer("SimpleNode"); consumerThread.start(); } }