1. 程式人生 > 實用技巧 >訊息佇列(基礎篇)- 7 訊息積壓了該如何處理

訊息佇列(基礎篇)- 7 訊息積壓了該如何處理

我們都知道,訊息積壓的直接原因,一定是系統中的某個部分出現了效能問題,來不及處理上游傳送的訊息,才會導致訊息積壓。

所以,我們先來分析下,在使用訊息佇列時,如何來優化程式碼的效能,避免出現訊息積壓。然後再來看看,如果你的線上系統出現了訊息積壓,該如何進行緊急處理,最大程度地避免訊息積壓對業務的影響。

優化效能來避免訊息積壓

在使用訊息佇列的系統中,對於效能的優化,主要體現在生產者和消費者這一收一發兩部分的業務邏輯中。對於訊息佇列本身的效能,你作為使用者,不需要太關注。為什麼這麼說呢?

主要原因是,對於絕大多數使用訊息佇列的業務來說,訊息佇列本身的處理能力要遠大於業務系統的處理能力。主流訊息佇列的單個節點,訊息收發的效能可以達到每秒鐘處理幾萬至幾十萬條訊息的水平,還可以通過水平擴充套件 Broker 的例項數成倍地提升處理能力。

而一般的業務系統需要處理的業務邏輯遠比訊息佇列要複雜,單個節點每秒鐘可以處理幾百到幾千次請求,已經可以算是效能非常好的了。所以,對於訊息佇列的效能優化,我們更關注的是,在訊息的收發兩端,我們的業務程式碼怎麼和訊息佇列配合,達到一個最佳的效能。

1. 傳送端效能優化

傳送端業務程式碼的處理效能,實際上和訊息佇列的關係不大,因為一般傳送端都是先執行自己的業務邏輯,最後再發送訊息。如果說,你的程式碼傳送訊息的效能上不去,你需要優先檢查一下,是不是發訊息之前的業務邏輯耗時太多導致的。

對於傳送訊息的業務邏輯,只需要注意設定合適的併發和批量大小,就可以達到很好的傳送效能。為什麼這麼說呢?

我們之前的課程中講過 Producer 傳送訊息的過程,Producer 發訊息給 Broker,Broker 收到訊息後返回確認響應,這是一次完整的互動。假設這一次互動的平均時延是 1ms,我們把這 1ms 的時間分解開,它包括了下面這些步驟的耗時:

  • 傳送端準備資料、序列化訊息、構造請求等邏輯的時間,也就是傳送端在傳送網路請求之前的耗時;
  • 傳送訊息和返回響應在網路傳輸中的耗時;
  • Broker 處理訊息的時延。

如果是單執行緒傳送,每次只發送 1 條訊息,那麼每秒只能傳送 1000ms / 1ms * 1 條 /ms = 1000 條 訊息,這種情況下並不能發揮出訊息佇列的全部實力。

無論是增加每次傳送訊息的批量大小,還是增加併發,都能成倍地提升傳送效能。至於到底是選擇批量傳送還是增加併發,主要取決於傳送端程式的業務性質。簡單來說,只要能夠滿足你的效能要求,怎麼實現方便就怎麼實現。

比如說,你的訊息傳送端是一個微服務,主要接受 RPC 請求處理線上業務。很自然的,微服務在處理每次請求的時候,就在當前執行緒直接傳送訊息就可以了,因為所有 RPC 框架都是多執行緒支援多併發的,自然也就實現了並行傳送訊息。並且線上業務比較在意的是請求響應時延,選擇批量傳送必然會影響 RPC 服務的時延。這種情況,比較明智的方式就是通過併發來提升傳送效能。

如果你的系統是一個離線分析系統,離線系統在效能上的需求是什麼呢?它不關心時延,更注重整個系統的吞吐量。傳送端的資料都是來自於資料庫,這種情況就更適合批量傳送,你可以批量從資料庫讀取資料,然後批量來發送訊息,同樣用少量的併發就可以獲得非常高的吞吐量。

2. 消費端效能優化

使用訊息佇列的時候,大部分的效能問題都出現在消費端,如果消費的速度跟不上傳送端生產訊息的速度,就會造成訊息積壓。如果這種效能倒掛的問題只是暫時的,那問題不大,只要消費端的效能恢復之後,超過傳送端的效能,那積壓的訊息是可以逐漸被消化掉的。

要是消費速度一直比生產速度慢,時間長了,整個系統就會出現問題,要麼,訊息佇列的儲存被填滿無法提供服務,要麼訊息丟失,這對於整個系統來說都是嚴重故障。

所以,我們在設計系統的時候,一定要保證消費端的消費效能要高於生產端的傳送效能,這樣的系統才能健康的持續執行。

消費端的效能優化除了優化消費業務邏輯以外,也可以通過水平擴容,增加消費端的併發數來提升總體的消費效能。特別需要注意的一點是,在擴容 Consumer 的例項數量的同時,必須同步擴容主題中的分割槽(也叫佇列)數量,確保 Consumer 的例項數和分割槽數量是相等的。如果 Consumer 的例項數量超過分割槽數量,這樣的擴容實際上是沒有效果的。原因我們之前講過,因為對於消費者來說,在每個分割槽上實際上只能支援單執行緒消費。

我見到過很多消費程式,他們是這樣來解決消費慢的問題的:

它收訊息處理的業務邏輯可能比較慢,也很難再優化了,為了避免訊息積壓,在收到訊息的 OnMessage 方法中,不處理任何業務邏輯,把這個訊息放到一個記憶體佇列裡面就返回了。然後它可以啟動很多的業務執行緒,這些業務執行緒裡面是真正處理訊息的業務邏輯,這些執行緒從記憶體佇列裡取訊息處理,這樣它就解決了單個 Consumer 不能並行消費的問題。

這個方法是不是很完美地實現了併發消費?請注意,這是一個非常常見的錯誤方法! 為什麼錯誤?因為會丟訊息。如果收訊息的節點發生宕機,在記憶體佇列中還沒來及處理的這些訊息就會丟失。關於“訊息丟失”問題,回顧https://blog.csdn.net/java_kider/article/details/109014506》。

訊息積壓了該如何處理?

還有一種訊息積壓的情況是,日常系統正常運轉的時候,沒有積壓或者只有少量積壓很快就消費掉了,但是某一個時刻,突然就開始積壓訊息並且積壓持續上漲。這種情況下需要你在短時間內找到訊息積壓的原因,迅速解決問題才不至於影響業務。

導致突然積壓的原因肯定是多種多樣的,不同的系統、不同的情況有不同的原因,不能一概而論。但是,我們排查訊息積壓原因,是有一些相對固定而且比較有效的方法的。

能導致積壓突然增加,最粗粒度的原因,只有兩種:要麼是傳送變快了,要麼是消費變慢了。

大部分訊息佇列都內建了監控的功能,只要通過監控資料,很容易確定是哪種原因。如果是單位時間傳送的訊息增多,比如說是趕上大促或者搶購,短時間內不太可能優化消費端的程式碼來提升消費效能,唯一的方法是通過擴容消費端的例項數來提升總體的消費能力。

如果短時間內沒有足夠的伺服器資源進行擴容,沒辦法的辦法是,將系統降級,通過關閉一些不重要的業務,減少傳送方傳送的資料量,最低限度讓系統還能正常運轉,服務一些重要業務。

還有一種不太常見的情況,你通過監控發現,無論是傳送訊息的速度還是消費訊息的速度和原來都沒什麼變化,這時候你需要檢查一下你的消費端,是不是消費失敗導致的一條訊息反覆消費這種情況比較多,這種情況也會拖慢整個系統的消費速度。

如果監控到消費變慢了,你需要檢查你的消費例項,分析一下是什麼原因導致消費變慢。優先檢查一下日誌是否有大量的消費錯誤,如果沒有錯誤的話,可以通過列印堆疊資訊,看一下你的消費執行緒是不是卡在什麼地方不動了,比如觸發了死鎖或者卡在等待某些資源上了。

小結

這節課我們主要討論了 2 個問題,一個是如何在訊息佇列的收發兩端優化系統性能,提前預防訊息積壓。另外一個問題是,當系統發生訊息積壓了之後,該如何處理。

優化訊息收發效能,預防訊息積壓的方法有兩種,增加批量或者是增加併發,在傳送端這兩種方法都可以使用,在消費端需要注意的是,增加併發需要同步擴容分割槽數量,否則是起不到效果的。

對於系統發生訊息積壓的情況,需要先解決積壓,再分析原因,畢竟保證系統的可用性是首先要解決的問題。快速解決積壓的方法就是通過水平擴容增加 Consumer 的例項數量。