1. 程式人生 > 實用技巧 >中通訊息平臺 Kafka 順序消費執行緒模型的實踐與優化

中通訊息平臺 Kafka 順序消費執行緒模型的實踐與優化

此文轉載自:https://blog.csdn.net/zchdjb/article/details/110427195

各類訊息中介軟體對順序訊息實現的做法是將具有順序性的一類訊息發往相同的主題分割槽中,只需要將這類訊息設定相同的 Key 即可,而 Kafka 會在任意時刻保證一個消費組同時只能有一個消費者監聽消費,因此可在消費時按分割槽進行順序消費,保證每個分割槽的訊息具備區域性順序性。由於需要確保分割槽訊息的順序性,並不能併發地消費消費,對消費的吞吐量會造成一定的影響。那麼,如何在保證訊息順序性的前提下,最大限度的提高消費者的消費能力?

本文將會對 Kafka 消費者拉取訊息流程進行深度分析之後,對 Kafka 消費者順序消費執行緒模型進行一次實踐與優化。

Kafka 消費者拉取訊息流程分析

在講實現 Kafka 順序消費執行緒模型之前,我們需要先深入分析 Kafka 消費者的訊息拉取機制,只有當你對 Kafka 消費者拉取訊息的整個流程有深入的瞭解之後,你才能夠很好地理解本次執行緒模型改造的方案。

我先給大家模擬一下訊息拉取的實際現象,這裡 max.poll.records = 500。

1、訊息沒有堆積時:

可以發現,在訊息沒有堆積時,消費者拉取時,如果某個分割槽沒有的訊息不足 500 條,會從其他分割槽湊夠 500 條後再返回。

2、多個分割槽都有堆積時:

在訊息有堆積時,可以發現每次返回的都是同一個分割槽的訊息,但經過不斷 debug,消費者在拉取過程中並不是等某個分割槽消費完沒有堆積了,再拉取下一個分割槽的訊息,而是不斷迴圈的拉取各個分割槽的訊息,但是這個迴圈並不是說分割槽 p0 拉取完 500 條,後面一定會拉取分割槽 p1 的訊息,很有可能後面還會拉取 p0 分割槽的訊息,為了弄明白這種現象,我仔細閱讀了相關原始碼。

org.apache.kafka.clients.consumer.KafkaConsumer#poll

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
  try {
    // poll for new data until the timeout expires
    do {
      // 客戶端拉取訊息核心邏輯
      final Map<TopicPartition, List<ConsumerRecord<
K, V>
>> records = pollForFetches(timer); if (!records.isEmpty()) { // 在返回資料之前, 傳送下次的 fetch 請求, 避免使用者在下次獲取資料時執行緒阻塞 if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { // 呼叫 ConsumerNetworkClient#poll 方法將 FetchRequest 傳送出去。 client.pollNoWakeup(); } return this.interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired()); return ConsumerRecords.empty(); } finally { release(); } }

我們使用 Kafka consumer 進行消費的時候通常會給一個時間,比如:

consumer.poll(Duration.ofMillis(3000));

從以上程式碼邏輯可以看出來,使用者給定的這個時間,目的是為了等待訊息湊夠 max.poll.records 條訊息後再返回,即使訊息條數不夠 max.poll.records 訊息,時間到了使用者給定的等待時間後,也會返回。

pollForFetches 方法是客戶端拉取訊息核心邏輯,但並不是真正去 broker 中拉取,而是從快取中去獲取訊息。在 pollForFetches 拉取訊息後,如果訊息不為零,還會呼叫 fetcher.sendFetches() 與 client.pollNoWakeup(),呼叫這兩個方法究竟有什麼用呢?

fetcher.sendFetches() 經過原始碼閱讀後,得知該方法目的是為了構建拉取請求 FetchRequest 並進行傳送,但是這裡的傳送並不是真正的傳送,而是將 FetchRequest 請求物件存放在 unsend 快取當中,然後會在 ConsumerNetworkClient#poll 方法呼叫時才會被真正地執行傳送。

fetcher.sendFetches() 在構建 FetchRequest 前,會對當前可拉取分割槽進行篩選,而這個也是決定多分割槽拉取訊息規律的核心,後面我會講到。

從 KafkaConsumer#poll 方法原始碼可以看出來,其實 Kafka 消費者在拉取訊息過程中,有兩條執行緒在工作,其中使用者主執行緒呼叫 pollForFetches 方法從快取中獲取訊息消費,在獲取訊息後,會再呼叫 ConsumerNetworkClient#poll 方法從 Broker 傳送拉取請求,然後將拉取到的訊息快取到本地,這裡為什麼在拉取完訊息後,會主動呼叫 ConsumerNetworkClient#poll 方法呢?我想這裡的目的是為了下次 poll 的時候可以立即從快取中拉取訊息。

pollForFetches 方法會呼叫 Fetcher#fetchedRecords 方法從快取中獲取並解析訊息:

public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
  Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
  int recordsRemaining = maxPollRecords;
  try {
    while (recordsRemaining > 0) {
      // 如果當前獲取訊息的 PartitionRecords 為空,或者已經拉取完畢
      // 則需要從 completedFetches 重新獲取 completedFetch 並解析成 PartitionRecords
      if (nextInLineRecords == null || nextInLineRecords.isFetched) {
        // 如果上一個分割槽快取中的資料已經拉取完了,直接中斷本次迴圈拉取,並返回空的訊息列表
        // 直至有快取資料為止
        CompletedFetch completedFetch = completedFetches.peek();
        if (completedFetch == null) break;
        try {
          // CompletedFetch 即拉取訊息的本地快取資料
          // 快取資料中 CompletedFetch 解析成 PartitionRecords
          nextInLineRecords = parseCompletedFetch(completedFetch);
        } catch (Exception e) {
          // ...
        }
        completedFetches.poll();
      } else {
        // 從分割槽快取中獲取指定條數的訊息
        List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
        // ...
        fetched.put(partition, records);
        recordsRemaining -= records.size();
      }
    }
  }
} catch (KafkaException e) {
  // ...
}
return fetched;
}

completedFetches 是拉取到的訊息快取,以上程式碼邏輯就是圍繞著如何從 completedFetches 快取中獲取訊息的,從以上程式碼邏輯可以看出:

maxPollRecords 為本次拉取的最大訊息數量,該值可通過 max.poll.records 引數配置,預設為 500 條,該方法每次從 completedFetches 中取出一個 CompletedFetch 並解析成可以拉取的 PartitionRecords 物件,即方法中的 nextInLineRecords,請注意,PartitionRecords 中的訊息數量可能大與 500 條,因此可能本次可能一次性從 PartitionRecords 獲取 500 條訊息後即返回,如果 PartitionRecords 中訊息數量不足 500 條,會從 completedFetches 快取中取出下一個要拉取的分割槽訊息,recordsRemaining 會記錄本次剩餘還有多少訊息沒拉取,通過迴圈不斷地從 completedFetches 快取中取訊息,直至 recordsRemaining 為 0。

以上程式碼即可解釋為什麼訊息有堆積的情況下,每次拉取的訊息很大概率是同一個分割槽的訊息,因為快取 CompletedFetch 快取中的訊息很大概率會多餘每次拉取訊息數量,Kafka 客戶端每次從 Broker 拉取的訊息資料並不是通過 max.poll.records 決定的,該引數僅決定使用者每次從本地快取中獲取多少條資料,真正決定從 Broker 拉取的訊息資料量是通過 fetch.min.bytes、max.partition.fetch.bytes、fetch.max.bytes 等引數決定的。

我們再想一下,假設某個分割槽的訊息一直都處於堆積狀態,Kafka 會每次都拉取這個分割槽直至將該分割槽消費完畢嗎?(根據假設,Kafka 消費者每次都會從這個分割槽拉取訊息,並將訊息存到分割槽關聯的 CompletedFetch 快取中,根據以上程式碼邏輯,nextInLineRecords 一直處於還沒拉取完的狀態,導致每次拉取都會從該分割槽中拉取訊息。)

答案顯然不會,不信你開啟 Kafka-manager 觀察每個分割槽的消費進度情況,每個分割槽都會有消費者在消費中。

那 Kafka 消費者是如何迴圈地拉取它監聽的分割槽呢?我們接著往下分析。

傳送拉取請求邏輯:

org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches

public synchronized int sendFetches() {
  // 解析本次可拉取的分割槽
  Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
  for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
    final Node fetchTarget = entry.getKey();
    final FetchSessionHandler.FetchRequestData data = entry.getValue();
    // 構建請求物件
    final FetchRequest.Builder request = FetchRequest.Builder
      .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
      .isolationLevel(isolationLevel)
      .setMaxBytes(this.maxBytes)
      .metadata(data.metadata())
      .toForget(data.toForget());
    // 傳送請求,但不是真的傳送,而是將請求儲存在 unsent 中
    client.send(fetchTarget, request)
      .addListener(new RequestFutureListener<ClientResponse>() {
        @Override
        public void onSuccess(ClientResponse resp) {
          synchronized (Fetcher.this) {

            // ... ...

            // 建立 CompletedFetch, 並快取到 completedFetches 佇列中
            completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                                    resp.requestHeader().apiVersion()));
          }

        }
      }
                   // ... ...
                   });
  }
  return fetchRequestMap.size();
}

以上程式碼邏輯很好理解,在傳送拉取請求前,先檢查哪些分割槽可拉取,接著為每個分割槽構建一個 FetchRequest 物件,FetchRequest 中的 minBytes 和 maxBytes,分別可通過 fetch.min.bytes 和 fetch.max.bytes 引數設定。這也是每次從 Broker 中拉取的訊息不一定等於 max.poll.records 的原因。

prepareFetchRequests 方法會呼叫 Fetcher#fetchablePartitions 篩選可拉取的分割槽,我們來看下 Kafka 消費者是如何進行篩選的:

org.apache.kafka.clients.consumer.internals.Fetcher#fetchablePartitions

private List<TopicPartition> fetchablePartitions() {
  Set<TopicPartition> exclude = new HashSet<>();
  List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
  if (nextInLineRecords != null && !nextInLineRecords.isFetched) {
    exclude.add(nextInLineRecords.partition);
  }
  for (CompletedFetch completedFetch : completedFetches) {
    exclude.add(completedFetch.partition);
  }
  fetchable.removeAll(exclude);
  return fetchable;
}

nextInLineRecords 即我們上面提到的根據某個分割槽快取 CompletedFetch 解析得到的,如果 nextInLineRecords 中的快取還沒拉取完,則不從 broker 中拉取訊息了,以及如果此時 completedFetches 快取中存在該分割槽的快取,也不進行拉取訊息。

我們可以很清楚的得出結論:

當快取中還存在中還存在某個分割槽的訊息資料時,消費者不會繼續對該分割槽進行拉取請求,直到該分割槽的本地快取被消費完,才會繼續傳送拉取請求。

為了更加清晰的表達這段邏輯,我舉個例子並將整個流程用圖表達出來:

假設某消費者監聽三個分割槽,每個分割槽每次從 Broker 中拉取 4 條訊息,使用者每次從本地快取中獲取 2 條訊息:

從以上流程可看出,Kafka 消費者自身已經實現了拉取限流的機制。

Kafka 順序消費執行緒模型的實現

kafka 的消費類 KafkaConsumer 是非執行緒安全的,因此使用者無法在多執行緒中共享一個 KafkaConsumer 例項,且 KafkaConsumer 本身並沒有實現多執行緒消費邏輯,如需多執行緒消費,還需要使用者自行實現,在這裡我會講到 Kafka 兩種多執行緒消費模型:

1、每個執行緒維護一個 KafkaConsumer

這種消費模型建立多個 KafkaConsumer 物件,每個執行緒維護一個 KafkaConsumer,從而實現執行緒隔離消費,由於每個分割槽同一時刻只能有一個消費者消費,所以這種消費模型天然支援順序消費。

但是缺點是無法提升單個分割槽的消費能力,如果一個主題分割槽數量很多,只能通過增加 KafkaConsumer 例項提高消費能力,這樣一來執行緒數量過多,導致專案 Socket 連線開銷巨大,專案中一般不用該執行緒模型去消費。

2、單 KafkaConsumer 例項 + 多 worker 執行緒

這種消費模型獎 KafkaConsumer 例項與訊息消費邏輯解耦,我們不需要建立多個 KafkaConsumer 例項就可進行多執行緒消費,還可根據消費的負載情況動態調整 worker 執行緒,具有很強的獨立擴充套件性,在公司內部使用的多執行緒消費模型就是用的單 KafkaConsumer 例項 + 多 worker 執行緒模型。但是通常情況下,這種消費模型無法保證消費的順序性。

那麼,如果在使用第二種消費模型的前提下,實現訊息順序消費呢?

接下來我們來看下 ZMS 是怎麼實現順序消費執行緒模型的,目前 ZMS 的順序消費執行緒模型為每個分割槽單執行緒消費模式:

com.zto.consumer.KafkaConsumerProxy#addUserDefinedProperties

首先在初始化的時候,會對消費執行緒池進行初始化,具體是根據 threadsNumMax 的數量建立若干個單個執行緒的執行緒池,單個執行緒的執行緒池就是為了保證每個分割槽取模後拿到執行緒池是序列消費的,但這裡建立 threadsNumMax 個執行緒池是不合理的,後面我會說到。

com.zto.consumer.KafkaConsumerProxy#submitRecords

ZMS 會對訊息分割槽進行取模,根據取模後的序號從執行緒池列表快取中獲取一個執行緒池,從而使得相同分割槽的訊息會被分配到相同執行緒池中執行,對於順序消費來說至關重要,前面我也說了,當用戶配置了順序消費時,每個執行緒池只會分配一個執行緒,如果相同分割槽的訊息分配到同一個執行緒池中執行,也就意味著相同分割槽的訊息會序列執行,實現訊息消費的順序性。

為了保證手動提交位移的正確性,我們必須保證本次拉取的訊息消費完之後才會進行位移提交,因此 ZMS 在消費前會建立一個 count 為本次訊息數量的 CountDownLatch:

final CountDownLatch countDownLatch = new CountDownLatch(records.count());

消費邏輯中,在 finally 進行 countDown 操作,最後會在本次消費主執行緒當中阻塞等待本次訊息消費完成:

com.zto.consumer.KafkaConsumerProxy#submitRecords

以上就是目前 ZMS 順序消費的執行緒模型,用圖表示以上程式碼邏輯:

以上,由於某些分割槽的訊息堆積量少於 500 條(Kafka 預設每次從 Broker 拉取 500 條訊息),因此會繼續從其它分割槽湊夠 500 條訊息,此時拉取的 500 條訊息會包含 3 個分割槽的訊息,ZMS 根據利用分割槽取模將同一個分割槽的訊息放到指定的執行緒池中(執行緒池只有一條執行緒)進行消費,以上圖來看,總共有 3 條執行緒在消費本次拉取的 500 條訊息。

那如果每個分割槽的積壓都超過了 500 條訊息呢?這種實際的情況會更加多,因為訊息中介軟體其中一個重要功能就是用於流量削峰,流量洪峰那段時間積壓幾百上千萬條訊息還是經常能夠遇到的,那麼此時每次拉取的訊息中,很大概率就只剩下一個分割槽了,我用如下圖表示:

在訊息流量大的時候,順序訊息消費時卻退化成單執行緒消費了。

如何提高 Kafka 順序消費的併發度?

經過對 ZMS 的消費執行緒模型以及對 Kafka 消費者拉取訊息流程的深入瞭解之後,我想到了如下幾個方面對 ZMS 的消費執行緒模型進行優化:

1、細化訊息順序粒度

之前的做法是將每個分割槽單獨一條執行緒消費,無法再繼續在分割槽之上增加消費能力,我們知道業務方傳送順序訊息時,會將同一型別具有順序性的訊息給一個相同的 Key,以保證這類訊息傳送到同一個分割槽進行消費,從而達到訊息順序消費的目的,而同一個分割槽會接收多種型別(即不同 Key)的訊息,每次拉取的訊息具有很大可能是不同型別的,那麼我們就可以將同一個分割槽的訊息,分配一個獨立的執行緒池,再利用訊息 Key 進行取模放入對應的執行緒中消費,達到併發消費的目的,且不打亂訊息的順序性。

2、細化位移提交粒度

由於 ZMS 目前是手動提交位移,目前每次拉取訊息必須先消費完才能進行位移提交,既然已經對分割槽訊息進行指定的執行緒池消費了,由於分割槽之間的位移先後提交不影響,那麼我們可以將位移提交交給每個分割槽進行管理,這樣拉取主執行緒不必等到是否消費完才進行下一輪的訊息拉取。

3、非同步拉取與限流

非同步拉取有個問題,就是如果節點消費跟不上,而拉取訊息過多地儲存在本地,很可能會造成記憶體溢位,因此我們需要對訊息拉取進行限流,當本地訊息快取量達到一定量時,阻止訊息拉取。

上面在分析 Kafka 消費者拉取訊息流程時,我們知道消費者在傳送拉取請求時,首先會判斷本地快取中是否存在該分割槽的快取,如果存在,則不傳送拉取請求,但由於 ZMS 需要改造成非同步拉取的形式,由於 Comsumer#poll 不再等待訊息消費完再進行下一輪拉取,因此 Kafka 的本地快取中幾乎不會存在資料了,導致 Kafka 每次都會發送拉取請求,相當於將 Kafka 的本地快取放到 ZMS 中,因此我們需要 ZMS 層面上對訊息拉取進行限流,Kafka 消費者有兩個方法可以設定訂閱的分割槽是否可以傳送拉取請求:

// 暫停分割槽消費(即暫停該分割槽傳送拉取訊息請求)
org.apache.kafka.clients.consumer.KafkaConsumer#pause
// 恢復分割槽消費(即恢復該分割槽傳送拉取訊息請求)
org.apache.kafka.clients.consumer.KafkaConsumer#resume

以上兩個方法,其實就是改變了消費者的訂閱分割槽的狀態值 paused,當 paused = true 時,暫停分割槽消費,當 paused = false 時,恢復分割槽消費,這個引數是在哪裡使用到呢?上面在分析 Kafka 消費者拉取訊息流程時我們有提到傳送拉取請求之前,會對可拉取的分割槽進行篩選,其中一個條件即分割槽 paused = false:

org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#isFetchable

private boolean isFetchable() {
  return !paused && hasValidPosition();
}

由於 KafkaConsumer 是非執行緒安全的,如果我們在非同步執行緒 KafkaConsumer 相關的類,會報如下錯誤:

KafkaConsumer is not safe for multi-threaded access

只需要確保 KafkaConsumer 相關方法在 KafkaConsumer#poll 方法執行緒中呼叫即可,具體做法可以設定一個執行緒安全上下文容器,非同步執行緒操作 KafkaConsumer 相關方法是,只需要將具體的分割槽放到上下文容器即可,後續統一由 poll 執行緒執行。

因此我們只需要利用好這個特性,就可以實現拉取限流,消費者主執行緒的 Comsumer#poll 方法依然是非同步不斷地從快取中獲取訊息,同時不會造成兩次 poll 之間的時間過大導致消費者被踢出消費組。

以上優化改造的核心是在不打亂訊息順序的前提下利用訊息 Key 儘可能地併發消費,但如果遇到分割槽中的訊息都是相同 Key,並且在有一定的積壓下每次拉取都是同一個分割槽的訊息時,以上模型可能沒有理想情況下的那麼好。這時是否可以將 fetch.max.bytes 與 max.partition.fetch.bytes 引數設定小一點,讓每個分割槽的本地快取都不足 500 條,這樣每次 poll 的訊息列表都可以包含多個分割槽的訊息了,但這樣又會導致 RPC 請求增多,這就需要針對業務訊息大小,對這些引數進行調優。

以上執行緒模型,需要增加一個引數 orderlyConsumePartitionParallelism,用於設定分割槽消費並行度,假設某個消費組被分配 5 個分割槽進行消費,則每個分割槽預設啟動一條執行緒消費,一共 5 * 1 = 5 條消費執行緒,當 orderlyConsumePartitionParallelism = 3,則每個分割槽啟動 3 條執行緒消費,一共 5 * 3 = 15 條消費執行緒。orderlyConsumePartitionParallelism = 1 時,則說明該分割槽所有訊息都處在順序(序列)消費;當 orderlyConsumePartitionParallelism > 1 時,則根據分割槽訊息的 Key 進行取模分配執行緒消費,保證不了整個分割槽順序消費,但保證相同 Key 的訊息順序消費。

注意,當 orderlyConsumePartitionParallelism > 1 時,分割槽消費執行緒的有效使用率取決於該分割槽訊息的 Key:

1、如果該分割槽所有訊息的 Key 都相同,則消費的 Key 取模都分配都同一條執行緒當中,並行度退化成 orderlyConsumePartitionParallelism = 1;

2、如果該分割槽相同 Key 的訊息過於集中,會導致每次拉取都是相同 key 的一批訊息,同樣並行度退化成 orderlyConsumePartitionParallelism = 1。

綜合對比:

優化前,ZMS 可保證整個分割槽訊息的順序性,優化後可根據訊息 Key 在分割槽的基礎上不打亂相同 Key 訊息的順序性前提下進行併發消費,有效地提升了單分割槽的消費吞吐量;優化前,有很大的概率會退化成同一時刻單執行緒消費,優化後儘可能至少保證每個分割槽一條執行緒消費,情況好的時候每個分割槽可多條執行緒消費。

通過以上場景分析,該優化方案不是提高順序消費吞吐量的銀彈,它有很大的侷限性,使用者在業務的實現上不能重度依賴順序消費去實現,以免影響業務效能上的需求。

總結

通過本文深度分析,我們已經認識到順序訊息會給消費吞吐量帶來怎麼樣的影響,因此使用者在業務的實現上不能重度依賴順序消費去實現,能避免則避免,如果一定要使用到順序消費,需要知道 Kafka 並不能保證嚴格的順序消費,在消費組重平衡過程中很可能就會將訊息的順序性打亂,而且順序消費會影響消費吞吐量,使用者需要權衡這種需求的利弊。

寫在最後

我們知道 RocketMQ 本身已經實現了具體的消費執行緒模型,使用者不需要關心具體實現,只需要實現訊息消費邏輯即可,而 Kafka 訊息者僅提供 KafkaConsumer#poll 一個方法,消費執行緒模型的實現則完全交由使用者去實現。

中通科技正式開源內部的訊息 Pass 雲平臺化產品 ZMS(ZTO Messaging Service),它可以遮蔽底層訊息中介軟體型別,封裝了包括 Kafka 消費線模型在內的具體實現,提供統一的對外 API,彌補了 Kafka 消費者這部分支援的不足。同時還提供了通過唯一標識動態路由訊息,為開發運維人員提供自動化部署運維叢集,主題、消費組申請與審批、實時監控、自動告警、容災遷移等功能。

同時希望更多的開源愛好者加入到該專案中,共同打造一體化的智慧訊息運維平臺。

ZMS GitHub 地址:https://github.com/ZTO-Express/zms

作者簡介

作者張乘輝,擅長訊息中介軟體技能,負責公司百萬 TPS 級別 Kafka 叢集的維護,作者維護的公號「後端進階」不定期分享 Kafka、RocketMQ 系列不講概念直接真刀真槍的實戰總結以及細節上的原始碼分析;同時作者也是阿里開源分散式事務框架 Seata Contributor,因此也會分享關於 Seata 的相關知識;當然公號也會分享 WEB 相關知識比如 Spring 全家桶等。內容不一定面面俱到,但一定讓你感受到作者對於技術的追求是認真的!

公眾號:後端進階

技術部落格:https://objcoding.com/

GitHub:https://github.com/objcoding/