KafkaConsumer多執行緒優化
優化目的
KafkaConsumer是以單執行緒模式執行,為了提升consumer的消費能力,多執行緒是一個很好的選擇。KafkaConsumer和KafkaProducer不同,後者是執行緒安全的,因此我們鼓勵使用者在多個執行緒中共享一個KafkaProducer例項,這樣通常都要比每個執行緒維護一個KafkaProducer例項效率要高。但對於KafkaConsumer而言,它不是執行緒安全的,所以實現多執行緒時通常有兩種實現方法
方案1 每個執行緒內部維護自己的consumer物件
缺點:
- 更多的TCP連線開銷(每個執行緒都要維護若干個TCP連線)
- consumer數受限於topic分割槽數,擴充套件性差
- 頻繁請求導致吞吐量下降
- 執行緒自己處理消費到的訊息可能會導致超時,從而造成rebalance
方案2 Consumer內部維護自己的worker執行緒池
缺點:
- 實現較複雜
- 執行緒池容量規劃
- 限流策略
- 優雅關閉
作者採用第二種方式進行實現,在實現過程中發了一些需要注意的細節,所以寫下本文做記錄
Consumer V1.0
private ExecutorService scheduler;
{
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat ("base-consumer-pool-%d").build();
scheduler = new ThreadPoolExecutor(4, 8, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE), threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
@KafkaListener(topics = "test" )
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
scheduler.execute(() -> {
//do something
});
}
}
problem
當下遊業務吞吐量,低於consumer的吞吐量時,訊息會線上程池中積壓,最終導致執行緒池滿,丟擲RejectedException
當我們設計執行緒池時,初衷是為了提升consumer的消費能力,當系統處理能力低於kafkaConsumer消費能力時,我們希望積壓的,訊息保留在kafka中,而不是被worker執行緒池rejected
解決方案:自旋限流 這裡借鑑了自旋鎖的思路
自旋鎖(spin lock) 是一種非阻塞鎖,也就是說,如果某執行緒需要獲取鎖,但該鎖已經被其他執行緒佔用時,該執行緒不會被掛起,而是在不斷的消耗CPU的時間,不停的試圖獲取鎖,從而減少執行緒切換開銷
自旋限流 (spin limit) 在消費執行緒中,若執行緒佇列大於限流閾值,則掛起當前執行緒,休眠後,再次驗證佇列長度,直到佇列長度小於閾值
Consumer V1.1
private ExecutorService scheduler;
private BlockingQueue<Runnable> workQueue;
private static final int MAX_QUEUE_SIZE = 1024;
{
workQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
scheduler = new ThreadPoolExecutor(4, 8, 0L, TimeUnit.MILLISECONDS,
workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());
}
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
//自旋阻塞consumer, 讓消費不了的訊息儲存在kafka中, 而不是被worker執行緒池rejected
while (workQueue.size() >= MAX_QUEUE_SIZE) {
Thread.sleep(100);
}
scheduler.execute(() -> {
//do something
});
}
}
problem
當執行緒佇列到達閾值後,自旋阻塞了consumer的消費過程,執行緒池沒有用上maximumPoolSize屬性 解決這個問題需要清楚CorePoolSize、MaximumPoolSize和workQueue的含義
執行緒池處理策略
- 如果當前執行緒池中的執行緒數目小於corePoolSize,則每來一個任務,就會建立一個執行緒去執行這個任務;
- 如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其新增到任務快取隊列當中,若新增成功,則該任務會等待空閒執行緒將其取出去執行;若新增失敗(一般來說是任務快取佇列已滿),則會嘗試建立新的執行緒去執行這個任務;
- 如果當前執行緒池中的執行緒數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
Consumer V1.2
private ExecutorService scheduler;
private BlockingQueue<Runnable> workQueue;
private static final int MAX_QUEUE_SIZE = 1024;
private static final int MAX_POOL_SIZE = 8;
{
workQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
scheduler = new ThreadPoolExecutor(4, MAX_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());
}
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
//驗證佇列長隊的同時校驗活躍執行緒數
while (workQueue.size() >= MAX_QUEUE_SIZE &&
MAX_POOL_SIZE - ((ThreadPoolExecutor) scheduler).getActiveCount() <= 0 ){
Thread.sleep(100);
}
scheduler.execute(() -> { //do something });
}
}
執行緒池優雅關閉
- Runtime.getRuntime().addShutdownHook()
- implements DisposableBean
- 程序需要使用kill -15進行關閉
@Override
public void destroy() throws Exception {
this.scheduler.shutdown();
awaitTermination(this.scheduler);
}
private void awaitTermination(ExecutorService executor) {
try{
executor.awaitTermination(30, TimeUnit.SECONDS)
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}