1. 程式人生 > >KafkaConsumer多執行緒優化

KafkaConsumer多執行緒優化

優化目的

KafkaConsumer是以單執行緒模式執行,為了提升consumer的消費能力,多執行緒是一個很好的選擇。KafkaConsumer和KafkaProducer不同,後者是執行緒安全的,因此我們鼓勵使用者在多個執行緒中共享一個KafkaProducer例項,這樣通常都要比每個執行緒維護一個KafkaProducer例項效率要高。但對於KafkaConsumer而言,它不是執行緒安全的,所以實現多執行緒時通常有兩種實現方法​

方案1 每個執行緒內部維護自己的consumer物件​

在這裡插入圖片描述 缺點:​

  • 更多的TCP連線開銷(每個執行緒都要維護若干個TCP連線)​
  • consumer數受限於topic分割槽數,擴充套件性差​
  • 頻繁請求導致吞吐量下降​
  • 執行緒自己處理消費到的訊息可能會導致超時,從而造成rebalance

方案2 Consumer內部維護自己的worker執行緒池​

在這裡插入圖片描述 缺點:​

  • 實現較複雜​
  • 執行緒池容量規劃​
  • 限流策略​
  • 優雅關閉

作者採用第二種方式進行實現,在實現過程中發了一些需要注意的細節,所以寫下本文做記錄

Consumer V1.0

	private ExecutorService scheduler;{​
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat
("base-consumer-pool-%d").build();​ scheduler = new ThreadPoolExecutor(4, 8, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(MAX_QUEUE_SIZE), threadFactory,new ThreadPoolExecutor.AbortPolicy());}@KafkaListener(topics = "test"
)public void listen(ConsumerRecord<?, ?> record) {​ Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {​ scheduler.execute(() -> {//do something​ });}}

problem

當下遊業務吞吐量,低於consumer的吞吐量時,訊息會線上程池中積壓,最終導致執行緒池滿,丟擲RejectedException​

​當我們設計執行緒池時,初衷是為了提升consumer的消費能力​,當系統處理能力低於kafkaConsumer消費能力時,我們希望積壓的​,訊息保留在kafka中,而不是被worker執行緒池rejected​

解決方案:自旋限流 這裡借鑑了自旋鎖的思路

自旋鎖(spin lock)​ 是一種非阻塞鎖,也就是說,如果某執行緒需要獲取鎖,但該鎖已經被其他執行緒佔用時,該執行緒不會被掛起,而是在不斷的消耗CPU的時間,不停的試圖獲取鎖,從而減少執行緒切換開銷​

自旋限流 (spin limit)​ 在消費執行緒中,若執行緒佇列大於限流閾值,則掛起當前執行緒,休眠後,再次驗證佇列長度,直到佇列長度小於閾值

Consumer V1.1

	private ExecutorService scheduler;private BlockingQueue<Runnable> workQueue;private static final int MAX_QUEUE_SIZE = 1024;{​
        workQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);​
        scheduler = new ThreadPoolExecutor(4, 8, 0L, TimeUnit.MILLISECONDS,​
                workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());}@KafkaListener(topics = "test")public void listen(ConsumerRecord<?, ?> record) {​
       Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {//自旋阻塞consumer, 讓消費不了的訊息儲存在kafka中,​ 而不是被worker執行緒池rejected​
            while (workQueue.size() >= MAX_QUEUE_SIZE) {​
                    Thread.sleep(100);}​
            scheduler.execute(() -> {//do something​
            });}}

problem

當執行緒佇列到達閾值後,自旋阻塞了consumer的消費過程​,執行緒池沒有用上maximumPoolSize屬性​ 解決這個問題需要清楚CorePoolSize、MaximumPoolSize和workQueue的含義​

執行緒池處理策略​

  • 如果當前執行緒池中的執行緒數目小於corePoolSize,則每來一個任務,就會建立一個執行緒去執行這個任務;​
  • 如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其新增到任務快取隊列當中,若新增成功,則該任務會等待空閒執行緒將其取出去執行;若新增失敗(一般來說是任務快取佇列已滿),則會嘗試建立新的執行緒去執行這個任務;​
  • 如果當前執行緒池中的執行緒數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;​

Consumer V1.2

	private ExecutorService scheduler;private BlockingQueue<Runnable> workQueue;private static final int MAX_QUEUE_SIZE = 1024;private static final int MAX_POOL_SIZE = 8;{​
        workQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);​
        scheduler = new ThreadPoolExecutor(4, MAX_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,​
                workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());}@KafkaListener(topics = "test")public void listen(ConsumerRecord<?, ?> record) {​
       Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {//驗證佇列長隊的同時校驗活躍執行緒數​
            while (workQueue.size() >= MAX_QUEUE_SIZE && 
            		MAX_POOL_SIZE - ((ThreadPoolExecutor) scheduler).getActiveCount() <= 0 ){​
                Thread.sleep(100);}​
            scheduler.execute(() -> { //do something });​
        }}

執行緒池優雅關閉

  • Runtime.getRuntime().addShutdownHook()​
  • implements DisposableBean ​
  • 程序需要使用kill -15進行關閉​
    @Overridepublic void destroy() throws Exception {this.scheduler.shutdown();awaitTermination(this.scheduler);}

    private void awaitTermination(ExecutorService executor) {try{​
           executor.awaitTermination(30, TimeUnit.SECONDS)} catch (InterruptedException ex) {​
            Thread.currentThread().interrupt();}}