RabbitMQ訊息丟失問題和保證訊息可靠性-消費端不丟訊息和HA(二)
繼續上篇文章解決RabbitMQ訊息丟失問題和保證訊息可靠性(一) 未完成部分,我們聊聊MQ Server端的高可用和消費端如何保證訊息不丟的問題?
迴歸上篇的內容,我們知道訊息從生產端到服務端,為了保證訊息不丟,我們必須做哪些事情?
- 傳送端採用Confirm模式,注意Server端沒成功通知傳送端,需要重發操作需要額外處理
- 訊息的持久化處理
上面兩個操作保證訊息到服務端不丟,但是非高可用狀態,如果節點掛掉,服務暫時不可用,需要重啟後,訊息恢復,訊息不會丟失,因為有磁碟儲存。
本文先從消費端講起:
RabbitMQ Server到消費者訊息如何不丟?
上面一篇文章也提到了,消費者獲取到訊息之後,沒有來得及處理完畢,自己直接宕機了,因為訊息者預設採用自動ack,此時RabbitMQ的自動ack機制會通知MQ Server這條訊息已經處理好了,此時訊息就丟了,並不是預期的。
那麼我們採用手動ack機制來解決這個問題,消費端處理完邏輯之後再通知MQ Server,這樣消費者沒處理完訊息不會發送ack,如果在消費者拿到訊息,沒來得及處理的情況下自己掛了,此時MQ叢集會自動感知到,它就會自覺的重發訊息給其他的消費者服務例項。
根據上面的思路你需要完成下面的兩步操作:
第一:消費者監聽設定手動ack
this.channel = channelManager.getListenerChannel(namespace);
this.queue = queue;
this.channel.basicConsume(queue, false, consumerTag, this);
this.disconnectedCallback.setChannel(channel);
核心程式碼: this.channel.basicConsume(queue, false, consumerTag, this); 第二個引數設定 false 代表不自動ack
第二:業務執行完成後手動ack
public static void ack(MessageContext context) {
long deliveryTag = context.getEnvelope().getDeliveryTag();
try {
context.getChannel().basicAck(deliveryTag, false);
} catch (IOException e) {
throw new MqAckException("訊息ack出錯:連線異常或遠端關閉", context, e);
}
}
核心程式碼: context.getChannel().basicAck(deliveryTag, false);
這裡封裝來,需要業務在執行完自己的業務程式碼後,呼叫物件channel 的ack方法通知MQServer,說我這邊執行完了,你可以刪除了。
注意這裡有個問題: 如果忘記呼叫這個 context.getChannel().basicAck(deliveryTag, false);
或者因為程式碼異常,這個程式碼沒被執行,會怎麼樣?後面找時間再寫一篇文章講這個問題。
RabbitMQ Server中儲存的訊息高可用
當我們解決了,生產端和消費端的問題後,基本保證訊息的不丟問題,但是還有一個是訊息的高可用問題,單節點問題,普通節點的問題都會影響訊息的臨時不可用,這個時候要用上我們的HA 映象叢集模式來保證。
上一篇文章 解決RabbitMQ訊息丟失問題和保證訊息可靠性(一) 已經提到過,服務端訊息部署的三種模式的區別,今天就專門講映象模式的介紹。
映象模式至少採用3節點,2個磁碟節點和1個記憶體節點來保證,架構圖:
設定映象也有一些策略:
- 同步至所有的,一般不這麼做,效能會受到極大影響
- 同步最多N個機器
- 只同步至符合指定名稱的nodes
命令處理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
- 為每個以“rock.wechat”開頭的佇列設定所有節點的映象,並且設定為自動同步模式
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
- 為每個以“rock.wechat.”開頭的佇列設定兩個節點的映象,並且設定為自動同步模式
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
- 為每個以“node.”開頭的佇列分配指定的節點做映象
rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
但是:HA 映象佇列有一個很大的缺點就是: 系統的吞吐量會有所下降
所以採用映象模式,要根據具體的業務規則定製話處理,沒那麼重要的業務,訊息丟了也沒關係的場景,又要求必須高的效能的時候,映象也可以不用設定。
總結
兩篇文章的講解,分析了訊息中介軟體高可用問題的大概的思路,沒有具體的程式碼詳細,如有疑問可以下方留言評論,我會及時回覆解答,後面我會逐步完善相關細節,歡迎多多關注。
後面計劃更新文章如下:
- 什麼情況會導致重複消費並怎麼解決?
- 什麼樣的真實業務場景需要保障順序性和如何保證訊息的順序性?
- 如何通過訊息佇列優雅的解決微服務間介面失敗的重試?
推薦閱讀
解決RabbitMQ訊息丟失問題和保證訊息可靠性(一)
IntelliJ IDEA提升效率開發外掛必備
END
如有收穫,請幫忙轉發,後續會有更好文章貢獻,您的鼓勵是作者最大的動力!
歡迎關注我的公眾號:架構師的修煉,獲得獨家整理的學習資源和日常乾貨推送。
相關推薦
RabbitMQ訊息丟失問題和保證訊息可靠性-消費端不丟訊息和HA(二)
繼續上篇文章解決RabbitMQ訊息丟失問題和保證訊息可靠性(一) 未完成部分,我們聊聊MQ Server端的高可用和消費端如何保證訊息不丟的問題? 迴歸上篇的內容,我們知道訊息從生產端到服務端,為了保證訊息不丟,我們必須做哪些事情? 傳送端採用Confirm模式,注意Server端沒成功通
利用Objective-C的反射機制和執行時特性實現類靜態方法的動態訪問(二)
繼上次的研究成果繼續深入研究,灑家又完善了下在執行時動態呼叫所有OC類方法的公用方法: typedef void*(*ObjcMsgSend)(id, SEL, ...); - (void *)invoke:(id)inst method:(NSString *)nam
基於 abp vNext 和 .NET Core 開發部落格專案 - 定時任務最佳實戰(二)
上一篇(https://www.cnblogs.com/meowv/p/12971041.html)使用`HtmlAgilityPack`抓取桌布資料成功將圖片存入資料庫,本篇繼續來完成一個全網各大平臺的熱點新聞資料的抓取。 同樣的,可以先預覽一下我個人部落格中的成品:https://meowv.com/h
RabbitMQ學習筆記(二)-----------------RabbitMQ生產消費訊息
專案地址:https://github.com/gongxianshengjiadexiaohuihui/RabbitMQ/tree/master/Hello_RabbitMQ 專案結構 需要的jar包 專案流程圖 x 首先是生產者的類,我們需要與RabbitServ
RabbitMQ訊息佇列系列教程(二)Windows下安裝和部署RabbitMQ
摘要 本篇經驗將和大家介紹Windows下安裝和部署RabbitMQ訊息佇列伺服器,希望對大家的工作和學習有所幫助! 目錄 一、Erlang語言環境的搭建 RabbitMQ開源訊息佇列服務是使用Erlang語言開發的,因此我們要使用他就必須先進行Erlang語言環境的搭建,其實是非常簡
Windows程式和訊息機制(二):訊息有關的函式
不同視窗程式可以通過訊息進行互動,主要用到的函式如下: FindWindow 獲取一個視窗的控制代碼。 HWND FindWindow( LPCTSTR lpClassName,// 類名 LPCTSTR lpWindowName//
【Spring訊息】RabbitMq安裝及簡單應用(二)
前言: 埋頭苦寫。先把官方文件翻譯過來。整個流程跑一遍。上一篇文章,【Spring訊息】RabbitMq安裝及簡單應用(一),把點對點發送訊息寫完了。之前雖然也可以一個生產者多個消費者,但是一條訊息只能被一個消費者處理,所以是點對點。這篇文章來講講釋出訂閱,一對多。一條訊息
Android之訊息處理機制(二)Handler的本質-Message和Looper到底是什麼?
目錄 Android之訊息處理機制(二) 以下皆為乾貨,比較幹,需要讀者細細理解。 前面(一)已經解釋了Handler的基本機制了,下面來概括一下本質。 一、MessageQueue MessageQueue其實就
訊息中介軟體--RabbitMQ學習(二)
Server:又稱 Broker,接受客戶端的連線,實現AMQP實體服務。 Connection:連線,應用程式與 Broker的網路連線。 Channel:網路通道,幾乎所有的操作都在 Channel中進行, Channel是進行訊息讀寫的通道。客戶端可建立多個 hannel,每個 Channel代表一個
RabbitMQ訊息佇列(二):”Hello, World“
原文地址:http://blog.csdn.net/anzhsoft/article/details/19570187 本文將使用Python(pika 0.9.8)實現從Producer到Consumer傳遞資料”Hello, World“。 首先複習一下上篇所學:RabbitMQ實現
RabbitMQ訊息佇列(二) fanout 廣播模式
先粘程式碼 生產者 ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = connectionFa
輕鬆搞定RabbitMQ(二)——工作佇列之訊息分發機制
上一篇博文中簡單介紹了一下RabbitMQ的基礎知識,並寫了一個經典語言入門程式——HelloWorld。本篇博文中我們將會建立一個工作佇列用來在工作者(consumer)間分發耗時任務。同樣是翻譯的官網例項。 工作佇列 在前一篇博文中,我們完
基於Python語言使用RabbitMQ訊息佇列(二)
工作佇列 在第一節我們寫了程式來向命名佇列傳送和接收訊息 。在本節我們會建立一個工作佇列(Work Queue)用來在多個工人(worker)中分發時間消耗型任務(time-consuming tasks)。 工作佇列(又叫做: Task Queues)背後
訊息中介軟體Rabbitmq(二)-使用詳解
https://blog.csdn.net/Dante_003/article/details/79377908Rabbitmq 是基於amqp(高階訊息佇列協議)實現的佇列技術,在他之上可以完成多種型別的訊息轉發模型。 下面列舉一些常用的訊息轉發場景,在rabbitmq中是
(二)RabbitMQ訊息佇列-RabbitMQ訊息佇列架構與基本概念
沒錯我還是沒有講怎麼安裝和寫一個HelloWord,不過快了,這一章我們先了解下RabbitMQ的基本概念。 RabbitMQ架構 說是架構其實更像是應用場景下的架構(自己畫的有點醜,勿嫌棄) 從圖中可以看出RabbitMQ主要由Exchange和Qu
C# Queue與RabbitMQ的愛恨情仇(文末附原始碼):Q與MQ訊息佇列簡單應用(二)
上一章我們講了佇列( Queue),這一章我們講Message Queue訊息佇列,簡稱MQ。 定義: MQ是MessageQueue,訊息佇列的簡稱(是流行的開源訊息佇列系統,利用erlang語言開發)。MQ是一種應用程式對應用程式的通訊方法。 應用程式通過讀寫入隊和出隊的訊息來通訊,無
訊息中介軟體——RabbitMQ(二)各大主流訊息中介軟體綜合對比介紹!
前言 訊息佇列已經逐漸成為企業IT系統內部通訊的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為非同步RPC的主要手段之一。當今市面上有很多主流的訊息中介軟體,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發RocketMQ等。今天主要來
.net core 和 WPF 開發升訊威線上客服系統:怎樣實現拔網線也不丟訊息的高可靠通訊(附視訊)
本系列文章詳細介紹使用 .net core 和 WPF 開發 升訊威線上客服與營銷系統 的過程。本產品已經成熟穩定並投入商用。 線上演示環境:[https://kf.shengxunwei.com](https://kf.shengxunwei.com) 注意:演示環境僅供演示交流與評估,不保證 7x24 小
WPF的訊息機制(二)- WPF內部的5個視窗之隱藏訊息視窗
原文: WPF的訊息機制(二)- WPF內部的5個視窗之隱藏訊息視窗 目錄 WPF的訊息機制(一)-讓應用程式動起來 WPF的訊息機制(二)-WPF內部的5個視窗 (1)隱藏訊息視窗 (2)處理啟用和關閉的訊息的視窗和系統資源通知視窗 (3)用於
PHP規範PSR7(HTTP訊息介面)介紹(二)
1.3 流 HTTP訊息由起始行,標題和正文組成。 HTTP訊息的主體可能非常小或非常大。嘗試將訊息正文表示為字串很容易消耗比預期更多的記憶體,因為正文必須完全儲存在記憶體中。嘗試將請求或響應的主體儲存在記憶體中將阻止使用該實現能夠使用大型訊息體。 StreamInterface用於在讀取或寫入