1. 程式人生 > >總結kafka的consumer消費能力很低的情況下的處理方案

總結kafka的consumer消費能力很低的情況下的處理方案

簡介

由於專案中需要使用kafka作為訊息佇列,並且專案是基於spring-boot來進行構建的,所以專案採用了spring-kafka作為原生kafka的一個擴充套件庫進行使用。先說明一下版本:

  • spring-boot 的版本是1.4.0.RELEASE
  • kafka 的版本是0.9.0.x 版本
  • spring-kafka 的版本是1.0.3.RELEASE

用過kafka的人都知道,對於使用kafka來說,producer的使用相對簡單一些,只需要把資料按照指定的格式傳送給kafka中某一個topic就可以了。本文主要是針對spring-kafka的consumer端上的使用進行簡單一些分析和總結。

kafka的速度是很快,所以一般來說producer的生產訊息的邏輯速度都會比consumer的消費訊息的邏輯速度快。

具體案例

之前在專案中遇到了一個案例是,consumer消費一條資料平均需要200ms的時間,並且在某個時刻,producer會在短時間內產生大量的資料丟進kafka的broker裡面(假設平均1s中內丟入了5w條需要消費的訊息,這個情況會持續幾分鐘)。

對於這種情況,kafka的consumer的行為會是:

  • kafka的consumer會從broker裡面取出一批資料,給消費執行緒進行消費。
  • 由於取出的一批訊息數量太大,consumer在session.timeout.ms
    時間之內沒有消費完成
  • consumer coordinator 會由於沒有接受到心跳而掛掉,並且出現一些日誌
    [rhllor] Tue Oct 18 21:39:16 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator coordinatorDead 529: kafka-example|NTI|Marking the coordinator 2147483646 dead.
    [rhllor] Tue Oct 18 21:39:16 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0
    -kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator sendGroupMetadataRequest 465: kafka-example|NTI|Issuing group metadata request to broker 1 [rhllor] Tue Oct 18 21:39:16 CST 2016 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle 550: kafka-example|NTI|Error ILLEGAL_GENERATION occurred while committing offsets for group new-message-1 [rhllor] Tue Oct 18 21:39:16 CST 2016 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onComplete 424: kafka-example|NTI|Auto offset commit failed: Commit cannot be completed due to group rebalance [rhllor] Tue Oct 18 21:39:16 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator run 408: kafka-example|NTI|Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handleGroupMetadataResponse 478: kafka-example|NTI|Group metadata response ClientResponse(receivedTimeMs=1476797957072, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]1d3d7e6, request=RequestSend(header={api_key=10,api_version=0,correlation_id=20,client_id=consumer-1}, body={group_id=new-message-1}), createdTimeMs=1476797956485, sendTimeMs=1476797956485), responseBody={error_code=0,coordinator={node_id=1,host=10.10.44.124,port=9092}}) [rhllor] Tue Oct 18 21:39:17 CST 2016 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle 550: kafka-example|NTI|Error ILLEGAL_GENERATION occurred while committing offsets for group new-message-1 [rhllor] Tue Oct 18 21:39:17 CST 2016 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator maybeAutoCommitOffsetsSync 445: kafka-example|NTI|Auto offset commit failed: [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare 247: kafka-example|NTI|Revoking previously assigned partitions [rhllor-log-0, rhllor-log-1, rhllor-log-2] [rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.springframework.kafka.listener.KafkaMessageListenerContainer onPartitionsRevoked 244: kafka-example|NTI|partitions revoked:[rhllor-log-0, rhllor-log-1, rhllor-log-2] [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 309: kafka-example|NTI|(Re-)joining group new-message-1 [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 318: kafka-example|NTI|Issuing request (JOIN_GROUP: {group_id=new-message-1,session_timeout=15000,member_id=consumer-1-64063d04-9d4e-45af-a927-17ccf31c6ec1,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}) to coordinator 2147483646 [rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 354: kafka-example|NTI|Attempt to join group new-message-1 failed due to unknown member id, resetting and retrying. [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 309: kafka-example|NTI|(Re-)joining group new-message-1 [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 318: kafka-example|NTI|Issuing request (JOIN_GROUP: {group_id=new-message-1,session_timeout=15000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}) to coordinator 2147483646 [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 336: kafka-example|NTI|Joined group: {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,members=[{member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]} [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator performAssignment 225: kafka-example|NTI|Performing range assignment for subscriptions {consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2=org.apache.kafka.clients.consume[email protected]} [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator performAssignment 229: kafka-example|NTI|Finished assignment: {consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2=org.apach[email protected]4826f394} [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onJoinLeader 397: kafka-example|NTI|Issuing leader SyncGroup (SYNC_GROUP: {group_id=new-message-1,generation_id=1,member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,group_assignment=[{member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=38 cap=38]}]}) to coordinator 2147483646 [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 423: kafka-example|NTI|Received successful sync group response for group new-message-1: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=38 cap=38]} [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinComplete 191: kafka-example|NTI|Setting newly assigned partitions [rhllor-log-0, rhllor-log-1, rhllor-log-2] [rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.springframework.kafka.listener.KafkaMessageListenerContainer onPartitionsAssigned 249: kafka-example|NTI|partitions assigned:[rhllor-log-0, rhllor-log-1, rhllor-log-2] [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator sendOffsetFetchRequest 581: kafka-example|NTI|Fetching committed offsets for partitions: [rhllor-log-0, rhllor-log-1, rhllor-log-2]
    日誌的意思大概是coordinator掛掉了,然後自動提交offset失敗,然後重新分配partition給客戶端
  • 由於自動提交offset失敗,導致重新分配了partition的客戶端又重新消費之前的一批資料
  • 接著consumer重新消費,又出現了消費超時,無限迴圈下去。

解決方案

遇到了這個問題之後, 我們做了一些步驟:

  • 提高了partition的數量,從而提高了consumer的並行能力,從而提高資料的消費能力
  • 對於單partition的消費執行緒,增加了一個固定長度的阻塞佇列和工作執行緒池進一步提高並行消費的能力
  • 由於使用了spring-kafka,則把kafka-client的enable.auto.commit設定成了false,表示禁止kafka-client自動提交offset,因為就是之前的自動提交失敗,導致offset永遠沒更新,從而轉向使用spring-kafka的offset提交機制。並且spring-kafka提供了多種提交策略:
    /**
       * The ack mode to use when auto ack (in the configuration properties) is false.
       * <ul>
       * <li>RECORD: Ack after each record has been passed to the listener.</li>
       * <li>BATCH: Ack after each batch of records received from the consumer has been
       * passed to the listener</li>
       * <li>TIME: Ack after this number of milliseconds; (should be greater than
       * {@code #setPollTimeout(long) pollTimeout}.</li>
       * <li>COUNT: Ack after at least this number of records have been received</li>
       * <li>MANUAL: Listener is responsible for acking - use a
       * {@link AcknowledgingMessageListener}.
       * </ul>
       */
      private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
    這些策略保證了在一批訊息沒有完成消費的情況下,也能提交offset,從而避免了完全提交不上而導致永遠重複消費的問題。

分析

那麼問題來了,為什麼spring-kafka的提交offset的策略能夠解決spring-kafka的auto-commit的帶來的重複消費的問題呢?下面通過分析spring-kafka的關鍵原始碼來解析這個問題。

  • 首先來看看spring-kafka的消費執行緒邏輯

    if (isRunning() && this.definedPartitions != null) { 
        initPartitionsIfNeeded();      
    // we start the invoker here as there will be no rebalance calls to       
    // trigger it, but only if the container is not set to autocommit       
    // otherwise we will process records on a separate thread      
       if (!this.autoCommit) {        
              startInvoker();     
       }
    }

    上面可以看到,如果auto.commit關掉的話,spring-kafka會啟動一個invoker,這個invoker的目的就是啟動一個執行緒去消費資料,他消費的資料不是直接從kafka裡面直接取的,那麼他消費的資料從哪裡來呢?他是從一個spring-kafka自己建立的阻塞佇列裡面取的。

  • 然後會進入一個迴圈,從原始碼中可以看到如果auto.commit被關掉的話, 他會先把之前處理過的資料先進行提交offset,然後再去從kafka裡面取資料。

  • 然後把取到的資料丟給上面提到的阻塞列隊,由上面建立的執行緒去消費,並且如果阻塞佇列滿了導致取到的資料塞不進去的話,spring-kafka會呼叫kafka的pause方法,則consumer會停止從kafka裡面繼續再拿資料。

  • 接著spring-kafka還會處理一些異常的情況,比如失敗之後是不是需要commit offset這樣的邏輯。

最後

  • spring-kafka是一個很好的用來操作kafka的庫,並且可以和spring進行完美結合。
  • spring-kafka提供了一些kafka使用上功能的擴充套件。
  • 相比於使用原生的kafka-client的api的話,使用更加簡單,需要編寫的碼量更少。
  • 最好能夠使用最新的kafka(0.10.0)和spring-kafka(1.1.1.RELEASE)的版本
轉載於:http://www.jianshu.com/p/4e00dff97f39

相關推薦

總結kafka的consumer消費能力情況處理方案

簡介 由於專案中需要使用kafka作為訊息佇列,並且專案是基於spring-boot來進行構建的,所以專案採用了spring-kafka作為原生kafka的一個擴充套件庫進行使用。先說明一下版本: spring-boot 的版本是1.4.0.RELEASEkafka 的

C#-搶紅包功能的分散式情況處理多併發

需求 需求經理設計了一個分享出去後,可以在微信群中搶優惠的活動。 簡單來說,就是每個參與活動的商品可以生成一個紅包池,分享到群裡後,可以像搶紅包一樣,去搶優惠金額。 問題 介面很快就根據需求設計開發出來了,並完善了相關活動規則。 但是多併發情況下,分享出去的紅

測試Nginx 和 Tomcat 高併發情況處理靜態頁面的效能

以下是 ab 壓力測試的結果(為了得到比較科學的資料可以進行多次的測試,一般至少10次) ab 可執行檔案的位置 /usr/local/web/apache/bin 測試命令:ab -n1000

chrome瀏覽器字型顏色太淺或者模糊的情況處理方法

*{font-weight:100!important;} *{text-shadow:0 0 1.5px #ccc !important;} *:not([cla

關於input在IE版本情況不兼容的解決辦法

不兼容 inpu pan app 兼容 type 一個 har charset 1 <!DOCTYPE html> 2 <html> 3 <head> 4 <meta charset="UTF-8"

記一次Kafka消費能力,重新分配節點問題優化

         目前在做一個車聯網APP專案。 專案中歷史軌跡的處理模式為kafka推送給我車輛報文,然後我自行判斷車輛熄火點火來進行歷史軌跡行程的儲存。        專案開始車輛較少,每次kafka推送

[httpd] httpd server 在負載的情況對SYN無響應

  如題: 兩臺client通過load balance訪問httpd server。兩個client互動訪問。load balance處於fullnat模式。 server在低負載情況下,常常對某一個client的請求無響應。   在server上,先後使用tcpdump,ns

大資料量情況查詢效能,耗時長的一種問題以及解決思路

背景交代: 1   mongodb 有500萬條資料                  2  經過過濾 還有20多萬條資料 要得到上述20w條資料,一次查詢得到20多萬條,很可能會產生效能問題,於

大數據量情況查詢性能,耗時長的一種問題以及解決思路

可能 數據 問題 skip 思路 原因 for 內存 mongo 背景交代: 1 mongodb 有500萬條數據 2 經過過濾 還有20多萬條數據 要得到上述20w條數據,一次查詢得到20多萬條,很可能會產生性能問題,於是同事用fo

2019.1.5 終於算是不上線的情況編寫完了一個web總結

專案過程 1.前端我們使用gulp來自動化開發流程。配置好gulp後,可以自動給我們處理好一些工作。比如寫完css後,要壓縮成.min.css,寫完js後,要做混淆和壓縮,圖片壓縮等。這些工作都可以讓gulp幫我們完成。 2. 前端頁面編寫。網站主要是整合視訊課程與文章閱覽。 例如:

移動裝置上4G和WIFI情況抓包總結

  總結一下自己使用過的移動裝置上的抓包方法。   移動裝置的作業系統主要包括Android,IOS以及WP。對於每一種系統來說,抓包的情況包括WIFI情況下的抓包以及移動等4G網路情況下的抓包。當然

Mysql模糊查詢like效率,以及更高效的寫法 在使用msyql進行模糊查詢的時候,自然的會用到like語句,通常情況下,在資料量小的時候,不容易看出查詢的效率,但在資料量達到百萬級,千萬級的時

在使用msyql進行模糊查詢的時候,很自然的會用到like語句,通常情況下,在資料量小的時候,不容易看出查詢的效率,但在資料量達到百萬級,千萬級的時候,查詢的效率就很容易顯現出來。這個時候查詢的效率就顯得很重要! 一般情況下like模糊查詢的寫法為(field已建立索引): SELECT `column

Kafka在高並發的情況,如何避免消息丟失和消息重復?kafka消費怎麽保證數據消費一次?數據的一致性和統一性?數據的完整性?

least 業務 針對 mar 完整 fse 依靠 更新 follow 1、kafka在高並發的情況下,如何避免消息丟失和消息重復? 消息丟失解決方案: 首先對kafka進行限速, 其次啟用重試機制,重試間隔時間設置長一些,最後Kafka設置acks=all,即需要相應的所

有return的情況try catch finally的執行順序總結

結論 1、不管有木有出現異常,finally塊中程式碼都會執行;2、當try和catch中有return時,finally仍然會執行;3、finally是在return後面的表示式運算後執行的(此

Xcode安裝版本的SDK(在Xcode裡下載不了的情況)

1.  進入 Uers../../Library/Caches/com.apple.dt.Xcode/Downloads目錄下  將第二個檔案copy到桌面,將其後綴名改為plist 2.

es 在資料量大的情況(數十億級別)如何提高查詢效率啊?

開發十年,就只剩下這套架構體系了! >>>   

es 在數據量大的情況(數十億級別)如何提高查詢效率啊?

做了 files 行數 引擎 占用 復雜 優化 ima 設計 面試題es 在數據量很大的情況下(數十億級別)如何提高查詢效率啊?面試官心理分析這個問題是肯定要問的,說白了,就是看你有沒有實際幹過 es,因為啥?其實 es 性能並沒有你想象中那麽好的。很多時候數據量大了,特別

Oracle單實例情況的library cache pin的問題模擬與問題分析

replace 等待事件 roc area oba lib plus ota sid Oracle單實例情況下的library cache pin的問題模擬與問題分析 參考自: WAITEVENT: "library cache pin" Reference Not

消費能力意識到一個遊

http .com 能力 com 的消費能力意識到一個遊的消費能力意識到一個遊的消費能力意識到一個遊的消費能力意識到一個遊的消費能力意識到一個遊的消費能力意識到一個遊的消費能力意識到一個遊的消費能力意識到一個遊的消費能力意識到一個遊的消費能力意識到一個遊的消費能力意識到一個遊的消費能力意識到一

如何在不使用系統函數的情況實現PHP中數組系統函數的功能

如何 利用 數組 關聯 uniq 出現的次數 回調 數組賦值 fun PHP中為我們提供了各種各樣的系統函數來實現我們需要的各種功能,那麽,在不使用系統函數的情況下我們要怎樣來實現這些功能呢?以下就是幾種系統函數的實現方式。 首先,我們來定義一個數組: $arr= arr