一種非同步消費kafka訊息的實現機制
一種非同步消費kafka訊息的實現機制
本文將從訊息流轉過程以及各步驟實現方式來進行闡述,程式碼基於springboot專案,配置檔案yml格式:
1.專案啟動時啟動kafka訊息消費執行緒
消費kafka訊息的類實現一個生命週期管理介面,這個介面自己定義,我這設為LifeCycle。
public interface LifeCycle { /** * start */ void startup(); /** * 生命週期結束時呼叫 */ void shutdown(); }
該LIfeCycle類在元件生命週期管理類ComponentContainer(自定義)中進行管理,該管理類實現org.springframework.context中的ApplicationListener介面。
/** * 元件生命週期管理 */ @Slf4j @Component public class ComponentContainer implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent event) { // get beans of LifeCycle Map<String, LifeCycle> components = event.getApplicationContext().getBeansOfType(LifeCycle.class); Collection<LifeCycle> instances = retrievalInstance(components); // startup instances.forEach(LifeCycle::startup); // shutdown Runtime.getRuntime().addShutdownHook(new Thread(() -> { instances.forEach(LifeCycle::shutdown); })); } /** * retrieval instance of LifeCycle */ private Collection<LifeCycle> retrievalInstance(Map<String, LifeCycle> components) { Collection<LifeCycle> allInstances = components == null ? new ArrayList<>() : new ArrayList<>(components.values()); return allInstances; } }
這樣程式啟動時,就會執行LifeCycle介面的實現類的startup方法了。
2.接收kafka訊息
註解org.springframework.kafka.annotation.KafkaListener監聽kafka訊息,在yml配置檔案中配置好topics和containerFactory的值
@KafkaListener(topics = { "${kafka.XXX.topics}" }, containerFactory = "${kafka.XXX.properties.listener-names}") public void onMessage1(ConsumerRecord<String, String> record) { try { LOGGER.info("收到訊息 record = {}",record.value()); doDealMessage(record.value()); } catch (Exception e) { LOGGER.info("處理訊息出錯 record = {}",record.value()); } }
3.將kafka訊息新增進對應的阻塞佇列,消費訊息
kafka訊息消費類MessageConsumer:
兩個具體的訊息消費類:Message1Consumer ,Message2Consumer
@Slf4j
@Service
public class MessageConsumer implements LifeCycle {
/**
* 資料中轉佇列
*/
private CommonQueue<String> queue1;
private CommonQueue<String> queue2;
/**
* 收到kafka訊息
* @param record
*/
@KafkaListener(topics = {
"${kafka.XXX.topics}"
}, containerFactory = "${kafka.XXX.properties.listener-name}")
public void onMessage1(ConsumerRecord<String, String> record) {
try {
LOGGER.info("收到訊息 record = {}",record.value());
doDealMessage1(record.value());
} catch (Exception e) {
LOGGER.info("處理訊息出錯 record = {}",record.value());
}
}
/**
* 收到kafka訊息
* @param record
*/
@KafkaListener(topics = {
"${kafka.XXX.topics}"
}, containerFactory = "${kafka.XXX1.properties.listener-name}")
public void onMessage(ConsumerRecord<String, String> record) {
try {
LOGGER.info("收到訊息 record ={}",record.value());
doDealMessage2(record.value());
} catch (Exception e) {
LOGGER.info("處理訊息出錯 record = {}",record.value());
}
}
public void doDealMessage1(String data) {
queue1.add(data);
}
public void doDealMessage2(String data) {
queue2.add(data);
}
@Override
public void startup() {
queue1 = new CommonQueue<>(new Message1Consumer());
queue2 = new CommonQueue<>(new Message2Consumer());
}
@Override
public void shutdown() {
if (queue1 != null) {
queue1.shutdown();
}
if (queue2 != null) {
queue2.shutdown();
}
}
/**
* 資料1消費佇列
*/
private class Message1Consumer implements QueueConsumer<String> {
@Override
public void accept(String messageVo) {
// 處理
try {
//處理訊息1
}
} catch (Exception e) {
LOGGER.error("處理圖譜資料出現異常,data={}", messageVo, e);
}
}
}
/**
* 資料2消費佇列
*/
private class Message2Consumer implements QueueConsumer<String> {
@Override
public void accept(String messageVo) {
try {
//處理訊息2
} catch (Exception e) {
LOGGER.error("處理訊息出現異常,data={}", messageVo, e);
}
}
}
}
CommonQueue類:佇列類,初始化阻塞佇列,並開啟執行緒
public class CommonQueue<T> {
private final Queue<T> queue;
private final Thread consumerThread;
private volatile boolean actived = true;
public CommonQueue(QueueConsumer<T> consumer) {
this.queue = new ArrayBlockingQueue<>(2000);
this.consumerThread = new Thread(new Consumer(queue, consumer), "common-queue-consumer-thread");
this.consumerThread.start();
}
public boolean add(T e) {
return queue.add(e);
}
public void shutdown() {
this.actived = false;
}
private class Consumer implements Runnable {
private Queue<T> queue;
private QueueConsumer<T> consumer;
public Consumer(Queue<T> queue, QueueConsumer<T> consumer) {
this.queue = queue;
this.consumer = consumer;
}
@Override
public void run() {
while (actived) {
T e = queue.poll();
if (e != null) {
this.consumer.accept(e);
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
}
}
QueueConsumer介面:具體的訊息消費類實現該介面
public interface QueueConsumer<T> {
void accept(T e);
}
4.程式異常處理機制
當程式出錯時,停止執行緒處理阻塞佇列中的訊息
public void shutdown() {
this.actived = false;
}
JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,可以註冊一個JVM關閉的鉤子,這個鉤子可以在一下幾種場景中被呼叫:
- 程式正常退出
- 使用System.exit()
- 終端使用Ctrl+C觸發的中斷
- 系統關閉
- OutOfMemory宕機
- 使用Kill pid命令幹掉程序(注:在使用kill -9 pid時,是不會被呼叫的)
5.總結
該實現機制在獲取到kafka訊息後,將訊息存到本地阻塞佇列ArrayBlockingQueue中,一類訊息擁有自己的佇列,讓對應的執行緒去取並處理該阻塞佇列中的訊息;一方面可以儘快的消費kafka的訊息,防止消費者無法跟上資料生成的速度;另一方面容易擴充套件,具體的訊息消費類實現通用accept()方法,實現方法的具體邏輯即可在新執行緒中非同步執行消費,不需要在具體的消費類中關注是否開啟新執行緒執行。