RabbitMQ 從入門到精通(二)
目錄
- 1. 訊息如何保障百分之百的投遞成功?
- 1.1 方案一:訊息落庫,對訊息狀態進行打標
- 1.2 方案二:訊息的延遲投遞,做二次確認,回撥檢查
- 2. 冪等性
- 2.1 冪等性是什麼?
- 2.2 訊息端冪等性保障
- 3. Confirm機制
- 3.1 如何理解?
- 3.2 怎麼實現?
- 4. Return機制
- 4.1 如何理解?
- 4.2 如何實現?
1. 訊息如何保障百分之百的投遞成功?
什麼是生產端的可靠性投遞?
- 保障訊息的成功發出
- 保障MQ節點的成功接收
- 傳送端收到MQ節點(Broker)確認應答
- 完善的進行訊息補償機制
如果想保障訊息百分百投遞成功,只做到前三步不一定能夠保障。有些時候或者說有些極端情況,比如生產端在投遞訊息時可能就失敗了,或者說生產端投遞了訊息,MQ也收到了,MQ在返回確認應答時,由於網路閃斷導致生產端沒有收到應答,此時這條訊息就不知道投遞成功了還是失敗了,所以針對這些情況我們需要做一些補償機制。
1.1 方案一:訊息落庫,對訊息狀態進行打標
- 進行資料的入庫,比如我們要傳送一條訂單訊息,首先得把業務資料也就是訂單資訊存庫,然後生成一條訊息,把訊息也進行入庫,這條訊息應該包含訊息狀態屬性 Create_Date(建立時間),並設定初始標誌 比如0,表示訊息建立成功,正在傳送中
- 首先要保證第一步訊息都儲存成功了,沒有出現任何異常情況,然後生產端再進行訊息傳送。如果失敗了就進行快速失敗機制
- MQ把訊息收到的結果應答
(confirm)
給生產端 生產端有一個
Confirm Listener
,去非同步的監聽Broker
回送的響應,從而判斷訊息是否投遞成功,如果成功,去資料庫查詢該訊息,並將訊息狀態更新為1,表示訊息投遞成功
假設第二步OK了,在第三步回送響應時,網路突然出現了閃斷,導致生產端的Listener就永遠收不到這條訊息的confirm應答了,也就是說這條訊息的狀態就一直為0了- 此時我們需要設定一個規則,比如說訊息在入庫時候設定一個臨界值timeout,5分鐘之後如果還是0的狀態那就需要把訊息抽取出來。這裡我們使用的是分散式定時任務,去定時抓取DB中距離訊息建立時間超過5分鐘的且狀態為0的訊息。
- 把抓取出來的訊息進行重新投遞
(Retry Send)
,也就是從第二步開始繼續往下走 當然有些訊息可能就是由於一些實際的問題無法路由到Broker,比如routingKey設定不對,對應的佇列被誤刪除了,那麼這種訊息即使重試多次也仍然無法投遞成功,所以需要對重試次數做限制,比如限制3次,如果投遞次數大於三次,那麼就將訊息狀態更新為2,表示這個訊息最終投遞失敗。
針對這種情況如何去做補償呢,可以有一個補償系統去查詢這些最終失敗的訊息,然後給出失敗的原因,當然這些可能都需要人工去操作。
第一種可靠性投遞,在高併發的場景下是否適合?
對於第一種方案,我們需要做兩次資料庫的持久化操作,在高併發場景下顯然資料庫存在著效能瓶頸。其實在我們的核心鏈路中只需要對業務進行入庫就可以了,訊息就沒必要先入庫了,我們可以做訊息的延遲投遞,做二次確認,回撥檢查。
當然這種方案不一定能保障百分百投遞成功,但是基本上可以保障大概99.9%的訊息是OK的,有些特別極端的情況只能是人工去做補償了,或者使用定時任務去做都可以。
1.2 方案二:訊息的延遲投遞,做二次確認,回撥檢查
Upstream Service
上游服務也就是生產端,Downstream service
下游服務也就是消費端,Callback service
就是回撥服務。
- 先將業務訊息進行入庫,然後生產端將訊息傳送出去
- 在傳送訊息之後,緊接著生產端再次傳送一條訊息
(Second Send Delay Check)
,即延遲訊息投遞檢查,這裡需要設定一個延遲時間,比如5分鐘之後進行投遞。 - 消費端去監聽指定佇列,將收到的訊息進行處理。
- 處理完成之後,傳送一個
confirm
訊息,也就是回送響應,但是這裡響應不是正常的ACK,而是重新生成一條訊息,投遞到MQ中。 - 上面的
Callback service
是一個單獨的服務,其實它扮演了第一種方案的儲存訊息的DB角色,它通過MQ去監聽下游服務傳送的confirm
訊息,如果Callback service
收到confirm
訊息,那麼就對訊息做持久化儲存,即將訊息持久化到DB中。 - 5分鐘之後延遲訊息傳送到MQ了,然後
Callback service
還是去監聽延遲訊息所對應的佇列,收到Check訊息後去檢查DB中是否存在訊息,如果存在,則不需要做任何處理,如果不存在或者消費失敗了,那麼Callback service
就需要主動發起RPC通訊給上游服務,告訴它延遲檢查的這條訊息我沒有找到,你需要重新發送,生產端收到資訊後就會重新查詢業務訊息然後將訊息傳送出去。
這麼做的目的是少做了一次DB的儲存,在高併發場景下,最關心的不是訊息100%投遞成功,而是一定要保證效能,保證能抗得住這麼大的併發量。所以能節省資料庫的操作就儘量節省,可以非同步的進行補償。
其實在主流程裡面是沒有這個Callback service的,它屬於一個補償的服務,整個核心鏈路就是生產端入庫業務訊息,傳送訊息到MQ,消費端監聽佇列,消費訊息。其他的步驟都是一個補償機制。
第二種方案也是網際網路大廠更為經典和主流的解決方案。但是若對效能要求不是那麼高,第一種方案要更簡單
2. 冪等性
2.1 冪等性是什麼?
簡單來說就是使用者對於同一操作發起的一次請求或者多次請求的結果是一致的。
我們可以借鑑資料庫的樂觀鎖機制來舉個例子:
首先為表新增一個版本欄位version
在執行更新操作前呢,會先去資料庫查詢這個version
然後執行更新語句,以version作為條件,例如:
UPDATE T_REPS SET COUNT = COUNT -1,VERSION = VERSION + 1 WHERE VERSION = 1
如果執行更新時有其他人先更新了這張表的資料,那麼這個條件就不生效了,也就不會執行操作了,通過這種樂觀鎖的機制來保障冪等性。
2.2 訊息端冪等性保障
重複消費問題:
當消費者消費完訊息時,在給生產端返回ack時由於網路中斷,導致生產端未收到確認資訊,該條訊息會重新發送並被消費者消費,但實際上該消費者已成功消費了該條訊息,這就是重複消費問題。
2.2.1 唯一ID+指紋碼機制
唯一ID:業務表唯一的主鍵,如商品ID
指紋碼:為了區別每次正常操作的碼,每次操作時生成指紋碼;可以用時間戳+業務編號或者標誌位(具體視業務場景而定)
- 唯一ID+指紋碼機制,利用資料庫主鍵去重
- SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID and IS_CONSUM= 指紋碼
- 好處:實現簡單
- 壞處:高併發下有資料庫寫入的效能瓶頸
- 解決方案:根據ID進行分庫分表演算法路由
整個思路就是首先我們需要根據訊息生成一個全域性唯一的ID,然後還需要加上一個指紋碼。這個指紋碼它並不一定是系統去生成的,而是一些外部的規則或者內部的業務規則去拼接,它的目的就是為了保障這次操作是絕對唯一的。
將ID + 指紋碼拼接好的值作為資料庫主鍵,就可以進行去重了。即在消費訊息前呢,先去資料庫查詢這條訊息的指紋碼標識是否存在,沒有就執行insert操作,如果有就代表已經被消費了,就不需要管了。
2.2.2 利用Redis的原子性去實現
這裡只提用Redis的原子性去解決MQ冪等性重複消費的問題
注意:MQ的冪等性問題 根本在於的是生產端未正常接收ACK,可能是網路抖動、網路中斷導致
我的方案:
MQ消費端在消費開始時 將 ID放入到Redis的BitMap中,MQ生產端每次生產資料時,從Redis的BitMap對應位置若不能取出ID,則生產訊息傳送,否則不進行訊息傳送。
但是有人可能會說,萬一消費端,生產端Redis命令執行失敗了怎麼辦,雖然又出現重複消費又出現Redis非正常執行命令的可能性極低,但是萬一呢?
OK,我們可以在Redis命令執行失敗時,將訊息落庫,每日用定時器,對這種極特殊的訊息進行處理。
3. Confirm機制
3.1 如何理解?
- 訊息的確認,是指生產者投遞訊息後,如果Broker收到訊息,則會給我們生產者一個應答
- 生產者進行接收應答,用來確定這條訊息是否正常的傳送到Broker,這種方式也是訊息的可靠性投遞
的核心保障
確認機制流程圖
生產端傳送訊息到Broker,然後Broker接收到了訊息後,進行回送響應,生產端有一個Confirm Listener
,去監聽應答,當然這個操作是非同步進行的,生產端將訊息傳送出去就可以不用管了,讓內部監聽器去監聽Broker給我們的響應。
3.2 怎麼實現?
- 第一步,在channel上開啟確認模式:
channel.confirmSelect()
- 第二步,在channel上新增監聽:
addConfirmListener
,監聽成功和失敗的返回結果,根據具體的結果對訊息進行重新發送、或記錄日誌等後續處理!
public class Producer {
public static void main(String[] args) throws Exception {
//建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//獲取Connection
Connection connection = connectionFactory.newConnection();
//通過connection建立一個新的Channel
Channel channel = connection.createChannel();
//指定我們的訊息投遞模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingkey = "confirm.save";
//傳送一條資訊
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingkey, null, msg.getBytes());
//新增一個確認監聽
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple)
throws IOException {
System.out.println("-------no ack!---------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple)
throws IOException {
System.out.println("--------ack!----------");
}
});
}
}
public class Consumer {
public static void main(String[] args) throws Exception{
//建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//獲取Connection
Connection connection = connectionFactory.newConnection();
//通過connection建立一個新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingkey = "confirm.#";
String queueName = "test_confirm_queue";
//宣告交換機和佇列 然後進行繫結和 設定 最後制定路由key
channel.exchangeDeclare(exchangeName, "topic",true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingkey);
//建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true,queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消費端:" + msg);
}
}
}
執行說明
先啟動消費端,訪問管控臺:http://ip:15672,檢查Exchange和Queue是否設定OK,然後啟動生產端,訊息被消費端消費,生產端也成功監聽到了ACK響應。
4. Return機制
4.1 如何理解?
Return Listener
用於處理一些不可路由的訊息!- 我們的訊息生產者,通過指定一個Exchange 和Routingkey,把訊息送達到某一個佇列中去, 然後我們的消費者監聽佇列,進行消費處理操作!
- 但是在某些情況下,如果我們在傳送訊息的時候,當前的exchange不存在或者指定的路由key路由不到,這個時候如果我們需要監聽這種不可達的訊息,就要使用
Return Listener
!
4.2 如何實現?
- 新增return監聽:
addReturnListener
,生產端去監聽這些不可達的訊息,做一些後續處理,比如說,記錄下訊息日誌,或者及時去跟蹤記錄,有可能重新設定一下就好了 - 傳送訊息時,設定
Mandatory
:如果為true,則監聽器會接收到路由不可達的訊息,然後進行後續處理,如果為false,那麼broker端自動刪除該訊息!
public class ReturnProducer {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection建立一個新的Channel
Channel channel = connection.createChannel();
String exchange = "test_return_exchange";
//String routingKey = "return.save";
String routingKeyError = "abc.save";
String msg = "Hello RabbitMQ Return Message";
//新增return監聽
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, BasicProperties properties, byte[] body) throws IOException {
//replyCode:響應碼 replyText:響應資訊
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
//System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
//5 傳送一條訊息,第三個引數mandatory:必須設定為true
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
}
}
public class ReturnConsumer {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection建立一個新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";
//4 宣告交換機和佇列,然後進行繫結設定路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消費者: " + msg);
}
}
}
執行說明
先啟動消費端,訪問管控臺:http://ip:15672,檢查Exchange和Queue是否設定OK,然後啟動生產端。
由於生產端設定的是一個錯誤的路由key,所以消費端沒有任何列印,而生產端列印瞭如下內容
如果我們將 Mandatory
屬性設定為false,對於不可達的訊息會被Broker直接刪除,那麼生產端就不會進行任何列印了。如果我們的路由key設定為正確的,那麼消費端能夠正確消費,生產端也不會進行任何打