RabbitMQ消息可靠性分析和應用
RabbitMQ流程簡介(帶Exchange)
RabbitMQ使用一些機制來保證可靠性,如持久化、消費確認及發布確認等。
先看以下這個圖:
P為生產者,X為中轉站(Exchange),紅色部分為消息隊列,C1、C2為消費者。
整個流程分成三部分:第一,生產者生產消息,發送到中轉站;第二,中轉站按定義的規則轉發消息到消息隊列;第三,消費者從消息隊列獲取消息進行消費(處理)。
RabbitMQ消息可靠性分析和應用
應用代碼均使用C#客戶端代碼實現。
一、發布確認
生產者生產消息,發送到中轉站的過程中,可能會因為網絡丟包、網絡故障等問題造成消息丟失。為了確保生產者發送的消息不會丟失,RabbitMQ提供了發布確認(Publisher Confirms)機制,從而提高消息的可靠性(註意:發布確認機制不能和事務機制一起使用)。
單條消息發布確認:
channel.ConfirmSelect();//發布確認機制 string message = "msg"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange:"MarkTopicChange", routingKey: "MarkRouteKey.one", basicProperties: null, body: body ); bool isPublished = channel.WaitForConfirms();//通道(channel)裏消息發送成功返回true
使用channel.ConfirmSelect,一旦信道進入確認模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始)。消息被投遞到所有匹配的隊列之後,RabbitMQ就會發送(Basic.Ack)給生產者(包含消息的唯一ID),生產者從而知道消息發送成功。
多條消息發布確認:
channel.ConfirmSelect();//發布確認機制 foreach (var itemMsg in lstMsg) { byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg); //發布消息 channel.BasicPublish( exchange: "MarkTopicChange", routingKey: "MarkRouteKey.one", basicProperties: null, body: sendBytes ); } bool isAllPublished = channel.WaitForConfirms();//通道(channel)裏所有消息均發送才返回true
註意:多消息發布確認機制情況下,倘若要發送100條消息,發送90條後,突然網絡故障,後面的消息發送失敗了,那麽isAllPublished返回的是false,而前面90條消息已經發送到消息隊列了。我們還不知道哪些消息是發送失敗的,所以很多條消息發布確認,建議分幾次發送或多通道發送。
此外,需要確保在中轉站(Exchange)的消息可以順利到達消息隊列。
(1)首先需要定義匹配的Exchange和Queue,根據Exchange的類型和routingKey確定轉發的關系。
(2)設置BasicPublish方法中mandatory參數為true,然後監聽Exchange中沒有匹配的隊列的消息,然後進行相操作。
(3)確保消息隊列有足夠內存存儲消息。
RabbitMQ默認配置vm_memory_high_watermark為0.4。意思是控制消息占40%內存左右。vm_memory_high_watermark_paging_ratio為0.5,當消息占用內存超過50%,RabbitMQ會把消息轉移到磁盤上以釋放內存。當磁盤剩余空間小於閥值disk_free_limit(默認為50M),所有生產者阻塞,避免充滿磁盤,導致所有的寫操作失敗。
RabbitMQ配置文件一般在%APPDATA%\RabbitMQ\rabbitmq.config.
%APPDATA% 一般為 C:\Users\%USERNAME%\AppData\Roaming(Windows環境)
二、持久化
消息存放到消息隊列後,在不配置消息持久化的情況下,若服務器重啟、關閉或宕機等,消息都會丟失。配置持久化可以有效提高消息的可靠性。持久化需要同時配置消息持久化和隊列持久化。單配置消息持久化,隊列消失了,消息沒有地方存放;單配置隊列持久化,隊列還在,消息沒了。
隊列持久化在定義隊列時候配置
//定義隊列 channel.QueueDeclare( queue: "Mark_Queue", //隊列名稱 durable: true, //隊列磁盤持久化 exclusive: false,//是否排他的,false。如果一個隊列聲明為排他隊列,該隊列首次聲明它的連接可見,並在連接斷開時自動刪除 autoDelete: false,//是否自動刪除,一般設成false arguments: null );
消息持久化在發布消息時候配置
//消息持久化,把DeliveryMode設成2 IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //發布消息 channel.BasicPublish( exchange: "MarkTopicChange", routingKey: "MarkRouteKey.one", basicProperties: properties, body: sendBytes );
如何配置了事務機制或發布確認(publisher confirm)機制,服務端的返回Basic.Ack是在消息落盤之後執行的,進一步的提高了消息的可靠性。
為了防止磁盤損壞帶來的消息丟失,可以配置鏡像隊列,這裏不作介紹。
三、消費確認
為了確保消息被消費者消費,RabbitMQ提供消費確認模式(consumer Acknowledgements)。自動確認模式,當消費者成功接收到消息後,自動通知RabbitMQ,把消息隊列中相應消息刪除。這很大程度上滿足不了我們,假如消費者接收到消息後,服務器宕機,消息還沒處理完成,這樣就會造成消息丟失。手動確認模式,當消費者成功處理完消息後,手動發消息通知RabbitMQ,把消息隊列中相應消息刪除。
consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘", routingKey, message); //確認該消息已被消費,發刪除消息給RabbitMQ,把消息隊列中的消息刪除 channel.BasicAck(ea.DeliveryTag, false); //消費消息失敗,拒絕此消息,重回隊列,讓它可以繼續發送到其他消費者 //channel.BasicReject(ea.DeliveryTag, true); //消費消息失敗,拒絕多條消息,重回隊列,讓它們可以繼續發送到其他消費者 //channel.BasicNack(ea.DeliveryTag, true, true); }; //手動確認消息,把autoAck設成false channel.BasicConsume(queue: "Mark_Queue", autoAck: false, consumer: consumer);
這裏值得註意的是,消息處理完成後,一定要把處理完成的消息發送到RabbitMQ(channel.BasicAck(ea.DeliveryTag, false)),不然RabbitMQ會一直等待,從而造成內存泄露。若處理消息過程中發生異常,可以使用channel.BasicReject(ea.DeliveryTag, true)來拒絕此消息,讓它重回隊列。若RabbitMQ收不到消費者任何確認消息的信號(包括確認信號,拒絕信號燈),直到此消費者斷開連接,消息才能重回隊列,繼續發送到其他消費者。
提醒一下,假如消費者消費消息的方法不支持並發(取決於需求),可以限制消費者每次只接收一條消息。
channel.BasicQos(0, 1, false);
RabbitMQ消息可靠性分析和應用