1. 程式人生 > >訊息佇列掃盲篇

訊息佇列掃盲篇

一、市面上流行的訊息佇列對比

  • ActivityMQ activityMQ是老牌的訊息佇列,技術相對成熟,但吞吐量一般,目前行業趨勢漸漸用的少了,社群也相比其他mq不夠活躍。
  • RabbitMQ,rabbitMQ是基於erlang語言開發的,國內中小企業比較流行,功能完備,特別值得一提的是管理後臺介面足夠人性化,功能豐富。並且社群活躍度較高,吞吐量還行。
  • RocketMQ 吞吐量比RabbitMq高,阿里開發,基於java語言,功能強大。
  • kafkaMQ 吞吐量比rocketMQ還要高,架構足夠輕,易於擴充套件,是大資料實時計算、日誌採集領域使用訊息中介軟體的標準,但功能較少,並沒有"事務性""訊息傳輸擔保(訊息確認機制)""訊息分組"等企業級特性。

總結看來,技術選型中,activityMQ一般不推薦,中小型公司用rabbitMQ(功能完備,使用較簡單,有很好的後臺介面)。 rocketMQ適合大型公司,大資料領域適合用kafka.

二、為什麼使用訊息佇列

1. 解耦

相比呼叫而言,訊息佇列有釋出訂閱模式(PUB/SUB),原來場景中的呼叫方只需要傳送訊息,不用考慮誰消費,不用手動維護呼叫關係,如果需要資料的系統太多維護起來是很複雜的。

2. 非同步

訊息佇列常用於非同步場景,以此減少響應時間。

3. 削峰

應用訊息佇列的拉模式能夠很好的緩解系統在高峰期的壓力,消費者保持自己能夠接收限度去拉取訊息。

三、訊息佇列可能導致的問題

1.可用性問題(MQ崩潰則整個系統崩潰)

以rabbitMQ為例,rabbitMQ有三種模式:單機模式、普通叢集模式、映象叢集模式。

單機模式

只有一個mq服務節點,沒有可用性可言

普通叢集模式

一個queue不會存多個副本在每個節點,真實資料只儲存在一個節點上,但其他節點會儲存該queue的元資料元(儲存佇列指標,長度等),消費者可以通過叢集中的任何節點獲取到資料(訪問的節點通過元資料取到實際儲存佇列的節點的資料然後返回給消費者)。但真實資料只在一個節點中,如果配置了持久化,那麼實際儲存資料的節點蹦了,其他節點不能建立同樣的佇列,所以不符合高可用性。這種模式優點只是在於分散mq的cpu,記憶體壓力,提高資料儲存空間。但也會存在很多叢集內部各個節點為了傳輸訊息的網路IO,同時可用性沒有保障。

映象叢集模式

相對普通叢集模式,所有節點同步儲存佇列的真實資料。管理控制檯可以配置一個映象叢集策略,指定所有節點或指定數量節點同步節點佇列。缺點是降低了系統性能,節點每次同步資料的網路開銷大。master提供對外服務,slave節點只提供備份服務,需要注意的是,並不是一個節點就是所有佇列的master節點。談論master和slave是針對佇列來談論的。

以Kafka為例:

每個topic有多個partion,每個partion分佈於不同的節點,而且partion有副本存在於其他節點,每個partion有leader和follow,讀寫只存在於leader,follow會自動同步leader的資料,如果leader掛了,將重新選舉一個leader。

2.訊息被重複消費

rabbitMq場景:消費確認機制,如果設定了手動提交,消費者收到訊息突然蹦了,導致ACK狀態碼未反饋至MQ

kafka場景:消費者會定期執行消費到的offset值提交給zookeeper用於給kafka節點從哪個位置傳送訊息為依據,但是由於極端原因如重啟導致沒有消費了訊息但是沒有提交,kafka任會按照上一次offset的位置傳送訊息給消費者,這就導致了重複消費

另外一種場景是:原本是想傳送廣播訊息到多個消費者服務中,但是一個服務例項部署了多臺,多臺重複消費了。

解決方案:設計消費方法為冪等的。這個思路可以是在redis儲存消費過後的一個標誌,下次消費就先判斷該標誌存不存在,存在則不進行操作。或者基於資料庫唯一鍵實現,報錯就報錯。

3.訊息積壓

場景:消費端故障導致訊息積壓

解決方案:

  • 快速排查好消費端故障
  • 改造原來的消費端,寫到別的伺服器的佇列中,然後臨時多開幾個消費端按照原有邏輯去消費這些佇列的資料
  • 如果還設定了過期失效(一般情況下不會這麼設定),部分資料丟了怎麼辦,手動寫程式把丟掉的資料查出來再手動發到訊息佇列中去

4.訊息丟失

以rabbitMQ為例,訊息丟失可以分為:

(1)生產者丟訊息

主要是因為,寫訊息的時候由於網路原因沒有到rabbitmq伺服器。

解決方案:

1.一是可以開啟rabbitmq的事務模式

一次事務互動主要有以下環節:

客戶端傳送給伺服器Tx.Select(開啟事務模式)

伺服器端返回Tx.Select-Ok(開啟事務模式ok)

推送訊息

客戶端傳送給事務提交Tx.Commit

伺服器端返回Tx.Commit-Ok

以上就完成了事務的互動流程,如果其中任意一個環節出現問題,就會丟擲IoException移除,這樣使用者就可以攔截異常進行事務回滾,或決定要不要重複訊息。但是這樣的缺點相比下面說的confirm機制,事務是同步的,吞吐量會低一點。

2.二是可以用confirm機制

流程是:

先把channel設定為confirm模式,傳送訊息,發完就不管了

生產者提供一個介面,用於實現成功/失敗回撥後的方法

接收失敗的話可以直接再重發一次

(2)MQ丟訊息

MQ丟訊息的情況可能是:rabbitmq接收到訊息,在消費者消費之前掛掉了。

解決方案(兩步):

1.設定佇列元資料持久化,設為durable;

2.生產者設定訊息持久化,delivery_mode=2。

另外:

若設定了持久化且開啟了confirm,將在持久化之後才回調生產者。

若持久化過程中宕機,還是會丟失資料,除非結合confirm機制。

(3)消費者丟訊息

消費者打開了autoAck機制(消費者收到訊息自動響應mq消費到了資料),如果正在消費時宕機了,就丟訊息了。

解決方案:關閉ack改為手動ack(如果要在意最終資料一致性,最好是在資料庫本地事務之後手動傳送ack),如果mq沒收到ack,mq將把訊息傳送給其他消費者。

以kafka為例:

(1) 生產者丟訊息

如果設定了ack=all,一定不會丟,同時設定了retries=max,則會無限重試

(2) MQ丟訊息

場景:有可能記憶體還沒同步到日誌檔案或者沒有同步到Follower但是自己down掉了。

參考解決方案:

topic設定repication.factor引數,必須大於1,要求每個Partition必須至少有2個副本。

kafka服務端設定min.insync.replicas大於1,要求一個leader至少需要感知到一個follower還跟自己保持聯絡。

生產者設定acks=all,要求每條資料必須是寫入所有replica之後,才能認為是寫成功了。

在producer端設定retries=max(很大的值),要求一旦寫入失敗,就無線重試。

(3)消費者丟訊息

消費者自動提交了offset,但是處理業務中卻異常宕機。取消offset,改為手動提交。

3.訊息順序變了

訊息順序有時候會影響業務,比如先修改後刪除的兩條訊息,不按順序來消費就會出錯。

從rabbitMQ上來說,需要保證順序的佇列避免多個消費者同時消費(避免工作佇列,只有一個消費者去消費)。

從kafka上來說,多個消費者同時消費的訊息指定到了不同的key分發到了不同的partion導致不同消費者同時消費。因為kafka的一個partion只允許一個消費者,所以只需要把保證順序的訊息傳送到同一個pation,讓其被同一個消費者消費就行了。

另外也有可能是消費者自己併發的去消費資訊造成的,針對這種情況,我們可以通過對訊息的唯一標識進行hash演算法分派到不同的記憶體佇列,然後執行緒只取其中一個記憶體佇列的訊息進行消費來控制相關聯的訊息順序不會亂。

4.一致性問題

一般使用訊息佇列只要配置對,使用訊息確認機制,是能夠保證一條訊息的可靠投遞的,問題主要在於如果一個事務發起方傳送了多條訊息,那麼多條訊息的最終一致性怎麼解決,針對此rocketMq有成熟的解決方案,但其他訊息佇列沒有,我們可以自己寫一箇中間的訊息服務維護這種確認機制(