1. 程式人生 > >RabbitMQ 訊息順序、訊息冪等、訊息重複、訊息事務、叢集

RabbitMQ 訊息順序、訊息冪等、訊息重複、訊息事務、叢集

1. 訊息順序

場景:比如下單操作,下單成功之後,會發布建立訂單和扣減庫存訊息,但扣減庫存訊息執行會先於建立訂單訊息,也就說前者執行成功之後,才能執行後者。

不保證完全按照順序消費,在 MQ 層面支援訊息的順序處理開銷太大,為了極少量的需求,增加整體上的複雜度得不償失。

所以,還是在應用層面處理比較好,或者業務邏輯進行處理

應用層解決方式:

  • 1. 訊息實體中增加:版本號 & 狀態機 & msgid & parent_msgid,通過 parent_msgid 判斷訊息的順序(需要全域性儲存,記錄訊息的執行狀態)。
  • 2. “同步執行”:當一個訊息執行完之後,再發佈下一個訊息。

2. 訊息冪等、訊息重複、訊息事務

訊息重複

造成訊息重複的根本原因是:網路不可達。只要通過網路交換資料,就無法避免這個問題。所以解決這個問題的辦法就是繞過這個問題。那麼問題就變成了:如果消費端收到兩條一樣的訊息,應該怎樣處理?

消費端處理訊息的業務邏輯保持冪等性。

保證每條訊息都有唯一編號且保證訊息處理成功與去重表的日誌同時出現。

第 1 條很好理解,只要保持冪等性,不管來多少條重複訊息,最後處理的結果都一樣。第 2 條原理就是利用一張日誌表來記錄已經處理成功的訊息的 ID,如果新到的訊息 ID 已經在日誌表中,那麼就不再處理這條訊息。

第 1 條解決方案,很明顯應該在消費端實現,不屬於訊息系統要實現的功能。第 2 條可以訊息系統實現,也可以業務端實現。正常情況下出現重複訊息的概率其實很小,如果由訊息系統來實現的話,肯定會對訊息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理訊息重複的問題,這也是 RabbitMQ 不解決訊息重複的問題的原因。

RabbitMQ 不保證訊息不重複,如果你的業務需要保證嚴格的不重複訊息,需要你自己在業務端去重。

AMQP 消費者確認機制

AMQP 定義了消費者確認機制(message ack),如果一個消費者應用崩潰掉(此時連線會斷掉,broker 會得知),但是 broker 尚未獲得 ack,那麼訊息會被重新放入佇列。所以 AMQP 提供的是“至少一次交付”(at-least-once delivery),異常情況下,訊息會被重複消費,此時業務要實現冪等性(重複訊息處理)。

AMQP 生產者事務

對於生產者,AMQP 定義了事務(tx transaction)來確保生產訊息被 broker 接收併成功入隊。TX 事務是阻塞呼叫

,生產者需等待broker寫磁碟後返回的確認,之後才能繼續傳送訊息。事務提交失敗時(如broker宕機場景),broker並不保證提交的訊息全部入隊。

TX 的阻塞呼叫使 broker 的效能非常差,RabbitMQ 使用 confirm 機制來優化生產訊息的確認。Confirm 模式下,生產者可以持續傳送訊息,broker 將訊息批量寫磁碟後回覆確認,生產者通過確認訊息的ID來確定哪些已傳送訊息被成功接收。Confirm 模式下生產者傳送訊息和接受確認是非同步流程,生產者需要快取未確認的訊息以便出錯時重新發送。

總結

  • 1. 訊息重複釋出:不存在,因為 AMQP 定義了事務(tx transaction)來確保生產訊息被 broker 接收併成功入隊。TX 事務是阻塞呼叫,生產者需等待 broker 寫磁碟後返回的確認,之後才能繼續傳送訊息。事務提交失敗時(如 broker 宕機場景),broker 並不保證提交的訊息全部入隊。RabbitMQ 使用 confirm 機制來優化生產訊息的確認(可以持續釋出訊息,但會批量回復確認)。
  • 2. 訊息重複消費:AMQP 提供的是“至少一次交付”(at-least-once delivery),異常情況下,訊息會被重複消費,此時業務要實現冪等性(重複訊息處理)。

應用層解決方式:

  • 1. 專門的 Map 儲存:用來儲存每個訊息的執行狀態(用 msgid 區分),執行成功之後更新 Map,有另外訊息重複消費的時候,讀取 Map 資料判斷 msgid 對應的執行狀態,已消費則不執行。
  • 2. 業務邏輯判斷:訊息執行完會更改某個實體狀態,判斷實體狀態是否更新,如果更新,則不進行重複消費。

特別說明:AMQP 協議中的事務僅僅是指生產者傳送訊息給 broker 這一系列流程處理的事務機制,並不包含消費端的處理流程。

3. 叢集

原 RabbitMQ 叢集:manager1、manager2、manager3 節點均為磁碟儲存,manager1 為主節點,HAProxy 負載三個節點。

現 RabbitMQ 叢集更新(更合理的配置):

  • 1. RabbitMQ 叢集更新:manager1、manager2 節點型別改為 ram(記憶體儲存),manager3 節點型別為 disc(磁碟儲存,用於儲存叢集配置和元資料),主節點變更為 manager3。
  • 2. HAProxy 負載更新:移除 manager3 負載(5672 埠),只保留 manage2、manager2 負載。

4. Kafka 和 RabbitMQ 對比

Kafka 應對場景:訊息持久化、吞吐量是第一要求、狀態由客戶端維護、必須是分散式的。Kafka 認為 broker 不應該阻塞生產者,高效的磁碟順序讀寫能夠和網路 IO 一樣快,同時依賴現代 OS 檔案系統特性,寫入持久化檔案時並不呼叫 flush,僅寫入 OS pagecache,後續由 OS flush。

這些特性決定了 Kafka 沒有做“確認機制”,而是直接將生產訊息順序寫入檔案、訊息消費後不刪除(避免檔案更新),該實現充分利用了磁碟 IO,能夠達到較高的吞吐量。代價是消費者要依賴 Zookeeper 記錄佇列消費位置、處理同步問題。沒有消費確認機制,還導致了 Kafka 無法瞭解消費者速度,不能採用 push 模型以合理的速度向消費者推送資料,只能利用 pull 模型由消費者來拉訊息(消費者承擔額外的輪詢開銷)。

如果在 Kafka 中引入消費者確認機制,就需要 broker 維護訊息消費狀態,要做到高可靠就需要寫檔案持久化並與生產訊息同步,這將急劇降低 Kafka 的效能,這種設計也極類似 RabbitMQ。如果不改變 Kafka 的實現,而是在 Kafka 和消費者之間做一層封裝,還是需要實現一套類似 RabbitMQ 的消費確認和持久化機制。

參考資料:

相關推薦

【面試寶典】訊息佇列如何保證性?

面試題:那麼來說稅如何保證訊息佇列的冪等性? 面試官心理剖析: 主要是看你對訊息佇列資料重複消費的問題,是否有了解,是否知道怎麼解決?如果這塊不知道,那麼面試官會覺得如果交給你做功能,可能會出現多次消費的情況。 為什麼會出現重複消費? 問題解決參考http://www.wityx.com/d

JS監聽微信支付寶移動app及瀏覽器的返回後退上一頁按鈕的事件方法

on() 移動app 自己的 win 功能 監聽 ner tor event $(function(){ pushHistory(); window.addEventListener("popstate", function(e) { alert("我監聽到

很多人都不知道的監聽微信支付寶移動app及瀏覽器的返回後退上一頁按鈕的事件方法

在實際的應用中,我們常常需要實現在移動app和瀏覽器中點選返回、後退、上一頁等按鈕實現自己的關閉頁面、調整到指定頁面或執行一些其它操作的 需求,那在程式碼中怎樣監聽當點選微信、支付寶、百度糯米、百度錢包等app的返回按鈕或者瀏覽器的上一頁或後退按鈕的事件呢。 我相信很多朋

RabbitMQ 訊息順序訊息訊息重複訊息事務叢集

1. 訊息順序 場景:比如下單操作,下單成功之後,會發布建立訂單和扣減庫存訊息,但扣減庫存訊息執行會先於建立訂單訊息,也就說前者執行成功之後,才能執行後者。 不保證完全按照順序消費,在 MQ 層面支援訊息的順序處理開銷太大,為了極少量的需求,增加整體上的複雜度得不償失。 所以,還是在應用層面處理比較好,或者業

Kafka學習——二Kafka 基本概念——訊息批次主題分割槽borker消費群組

Kafka 基本概念 訊息:類似於資料庫的記錄,由位元組陣列組成,訊息裡的資料沒有特別的格式或含義。訊息可以有一個可選的元資料(鍵),主要是當以一種可控的方式寫入不同的分割槽時,需要用到。 批次:用於提高效率,將訊息分批次寫入kafka,批次也就是一組訊息,但這些訊息屬於同一主題

如何保證訊息佇列的高可用和性以及資料丟失,順序一致性

如何保證訊息佇列的高可用和冪等性以及資料丟失,順序一致性 <!-- more --> RabbitMQ的高可用性 RabbitMQ是比較有代表性的,因為是基於主從做高可用性的,我們就以他為例子講解第一種MQ的高可用性怎麼實現。 rabbitmq有三種模式: 單機模式 普通叢集模

RabbitMQ 訊息持久化事務Publisher的訊息確認機制

RabbitMQ  訊息持久化、事務、Publisher的訊息確認機制 1. 宣告MessageQueue 在RabbitMQ中,無論是生產者傳送訊息還是消費者接受訊息,都首先需要宣告一個MessageQueue。 這就存在一個問題,是生產者宣告還是消費者宣告呢?要解決這個

可在廣域網部署執行的QQ高仿版 -- GG嘰嘰V3.6,增加語音訊息語音留言功能

  自從微信出來後,語音訊息和語音留言變得非常流行,按下一個鍵說話,比打字要方便多了。GG在V3.6版本增加了對語音訊息和語音留言(或稱為離線語音訊息)的支援。這兩個功能的實現已經很完整,只是比較遺憾的一點是:GG所使用的文字框控制元件,還沒有辦法像微信的聊天視窗的文字框那樣嵌入表示語音訊息的控制元件,所以,

rabbitmq系列(三)訊息性處理

一、springboot整合rabbitmq 我們需要新建兩個工程,一個作為生產者,另一個作為消費者。在pom.xml中新增amqp依賴: <dependency> <groupId>org.springframework.boot</groupId>

通過Protobuf整合Netty實現對協議訊息客戶端與伺服器通訊實戰

目錄 一、Protocol Buffers 是什麼? 二、Protocol Buffers 檔案和訊息詳解 三、專案實戰,直接掌握的protobuf應用。 一、Protocol Buffers 是什麼?         1、官網翻譯

RocketMQ-訊息重試,訊息去重,訊息模式

訊息重試 Rocketmq提供了訊息重試機制,這是一些其他訊息佇列沒有的功能。我們可以依靠這個優秀的機制,而不用在開發中增加更多的業務程式碼去實現 Consumer 消費訊息失敗後,要提供一種重試機制,令訊息再消費一次。Consumer 消費訊息失敗通常可以認為有以下幾種情況  

[Xcode10 實際操作]八網路與多執行緒-(18)PerformSelector訊息處理方法:由執行時系統,負責去呼叫物件的指定方法

本文將演示PerformSelector訊息處理方法。在專案資料夾上點選滑鼠右鍵彈出檔案選單。【New File】->【Swift File】->【Next】->【Save As】:iOSApp.swift->【Create】現在開始編寫程式碼,建立Swift類【iOSApp.swif

JS-呼叫棧事件迴圈訊息佇列(也叫任務隊和回撥佇列)作業佇列(微任務佇列)

一:呼叫棧是個什麼鬼東西,它具有棧的屬性--後進先出 先看一段簡單的JS程式碼: const second = function(){ console.log('hello there'); } const first = function() { console.log('hi,first'); secon

01分散式鎖(互斥性性)

隨著網際網路資訊科技的飛速發展,資料量不斷增大,業務邏輯也日趨複雜,對系統的高併發訪問、海量資料處理的場景也越來越多。如何用較低成本實現系統的高可用、易伸縮、可擴充套件等目標就顯得越發重要。為了解決這一系列問題,系統架構也在不斷演進。傳統的集中式系統已經逐漸無法滿足要求,分散式系統被使用在更多的場景

分散式系統一致性(ACIDCAPBASE二段提交三段提交TCC性)原理詳解

本文內容屬於分散式事物處理範疇,強調強一致性與區塊鏈倡導的共識一致性略有不同,主要是最近以太坊ico導致交易擁堵問題引起我的思考,是不是有可能利用已有的解決方案進行演變增加每個節點的提交時的一致性,簡單來講就是在p2p網路中實現部分強一致性,提升區塊鏈交易效能,文中TCC就是一種演變思路,很

MQ之如何做到訊息 (轉 優秀)

一、緣起 MQ訊息必達,架構上有兩個核心設計點: (1)訊息落地 (2)訊息超時、重傳、確認   再次回顧訊息匯流排核心架構,它由 傳送端、服務端、固化儲存、接收端 四大部分

以SpringMVC框架為中心瘋狂擴充套件-06MessageListener實時監聽ActiveMQ中的訊息

1、在spring-activemq.xml中新加入listenerContainer和syxTopicDest等配置,實現訊息監聽容器,在connectionFactory中加入clientId。 <?xml version="1.0" encoding="UTF

順序棧的定義初始化出棧入棧操作 C++程式碼實現 ——感想

using namespace std; /*順序棧的定義*/ #define Stack_Size 100 typedef struct sqStack {        char *elem;        int top;        int stackSize;//棧陣列長度 }sqStack;

HTTP性及GETPOSTPUTDELETE的區別

基於HTTP協議的Web API是時下最為流行的一種分散式服務提供方式。無論是在大型網際網路應用還是企業級架構中,我們都見到了越來越多的SOA或RESTful的Web API。為什麼Web API如此流行呢?我認為很大程度上應歸功於簡單有效的HTTP協議。HTTP協議是一種分散式的面向資源的網路應用層協議,

34分散式服務介面的性如何設計(比如不能重複扣款)?

1、面試題 分散式服務介面的冪等性如何設計(比如不能重複扣款)? 2、面試官心裡分析 從這個問題開始,面試官就已經進入了實際的生產問題的面試了。 一個分散式系統中的某個介面,要保證冪等性,該如何保證?這個事兒其實是你做分散式系統的時候必須要考慮的一個生產環境的技術問題。啥意思呢?