Java+Kafka訊息佇列
阿新 • • 發佈:2019-02-08
本文主要針對,Java端對Kafka訊息佇列的生產和消費。Kafka的安裝部署,請看檢視相關文章。
筆者最近所用的是Spring mvc,監聽檔案路徑,然後將讀取到的檔案內容傳送到訊息佇列中。由另外系統去消費訊息。
當然訊息佇列作為訊息交換機,本系統既有生產訊息也有消費訊息。不做詳述。
生成者程式碼相對簡單很多。
消費者程式碼package com.dhc.test.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.log4j.Logger; import java.util.Properties; public class ProducerHandler { private final KafkaProducer<String, String> producer; private static Logger logger = Logger.getLogger(DataInManager.class.getName()); public ProducerHandler(String topic,String message) { Properties props = new Properties(); props.put("bootstrap.servers”,"127.0.0.1:9092"); props.put("acks", "all"); props.put("retries", "0"); props.put("batch.size", "16384"); props.put("linger.ms", "1"); props.put("buffer.memory", "33554432"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"
); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(props); //生成訊息 ProducerRecord record = new ProducerRecord(topic,message); //傳送訊息 producer.send(record); logger.info("【kafka】向Kafka的TOPIC【" + topic + "】中傳送訊息"); logger.info("【kafka】訊息內容:" + message); logger.info("【kafka】推送成功"); } }
package com.dhc.test.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.log4j.Logger; import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ConsumerHandler { static Logger logger = Logger.getLogger(DataInManager.class.getName()); private final KafkaConsumer<String, String> consumer; private ExecutorService executors; public ConsumerHandler(List<String> topics) { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(topics); execute(1); } public void execute(int workerNum) { executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy()); Thread t = new Thread(new Runnable(){//啟動一個子執行緒來監聽kafka訊息 public void run(){ while (true) { ConsumerRecords<String, String> records = consumer.poll(200); for (final ConsumerRecord record : records) { logger.info("【Kafka】監聽到kafka的TOPIC【" + record.topic() + "】的訊息"); logger.info("【Kafka】訊息內容:" + record.value()); executors.submit(new ConsumerWorker(record)); } } }}); t.start(); } public void shutdown() { if (consumer != null) { consumer.close(); } if (executors != null) { executors.shutdown(); } try { if (!executors.awaitTermination(10, TimeUnit.SECONDS)) { logger.info("【Kafka】Timeout.... Ignore for this case "); } } catch (InterruptedException ignored) { logger.info("【Kafka】Other thread interrupted this shutdown, ignore for this case."); Thread.currentThread().interrupt(); } } }
package com.dhc.test.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;
public class ConsumerWorker implements Runnable {
private ConsumerRecord<String, String> consumerRecord;
public ConsumerWorker(ConsumerRecord record) {
this.consumerRecord = record;
}
private static Logger logger = Logger.getLogger(DataInManager.class.getName());
public void run() {
// consumer接收訊息後,這裡可以寫針對收到的訊息的業務處理
System.out.println(consumerRecord.value());
}
}
main方法啟動
package com.dhc.test;
import com.dhc.test.kafka.ConsumerHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Start {
public static void main(String[] args) throws Exception {
// 啟動Kafka consumer監視
List<String> topics = new ArrayList<String>();
// 監聽的訊息通道
topics.add("test");
new ConsumerHandler(topics);
}
}
謝謝關注!