1. 程式人生 > >RabbitMQ消息可靠性分析和應用

RabbitMQ消息可靠性分析和應用

ndb -c byte ref one route strong free 消息發布

原文:RabbitMQ消息可靠性分析和應用

RabbitMQ流程簡介(帶Exchange)

RabbitMQ使用一些機制來保證可靠性,如持久化、消費確認及發布確認等。

先看以下這個圖:

技術分享圖片

P為生產者,X為中轉站(Exchange),紅色部分為消息隊列,C1、C2為消費者。

整個流程分成三部分:第一,生產者生產消息,發送到中轉站;第二,中轉站按定義的規則轉發消息到消息隊列;第三,消費者從消息隊列獲取消息進行消費(處理)。


RabbitMQ消息可靠性分析和應用

應用代碼均使用C#客戶端代碼實現。

一、發布確認

生產者生產消息,發送到中轉站的過程中,可能會因為網絡丟包、網絡故障等問題造成消息丟失。為了確保生產者發送的消息不會丟失,RabbitMQ提供了發布確認(Publisher Confirms)機制,從而提高消息的可靠性(註意:發布確認機制不能和事務機制一起使用)。

單條消息發布確認:

       channel.ConfirmSelect();//發布確認機制
                string message = "msg";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(
                        exchange: 
"MarkTopicChange", routingKey: "MarkRouteKey.one", basicProperties: null, body: body ); bool isPublished = channel.WaitForConfirms();//通道(channel)裏消息發送成功返回true

使用channel.ConfirmSelect,一旦信道進入確認模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始)。消息被投遞到所有匹配的隊列之後,RabbitMQ就會發送(Basic.Ack)給生產者(包含消息的唯一ID),生產者從而知道消息發送成功。

多條消息發布確認:

   channel.ConfirmSelect();//發布確認機制
                foreach (var itemMsg in lstMsg)
                {
                    byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);
                    //發布消息
                    channel.BasicPublish(
                        exchange: "MarkTopicChange",
                        routingKey: "MarkRouteKey.one",
                        basicProperties: null,
                        body: sendBytes
                        );
                }
                bool isAllPublished = channel.WaitForConfirms();//通道(channel)裏所有消息均發送才返回true 

註意:多消息發布確認機制情況下,倘若要發送100條消息,發送90條後,突然網絡故障,後面的消息發送失敗了,那麽isAllPublished返回的是false,而前面90條消息已經發送到消息隊列了。我們還不知道哪些消息是發送失敗的,所以很多條消息發布確認,建議分幾次發送或多通道發送。

此外,需要確保在中轉站(Exchange)的消息可以順利到達消息隊列。

(1)首先需要定義匹配的Exchange和Queue,根據Exchange的類型和routingKey確定轉發的關系。

(2)設置BasicPublish方法中mandatory參數為true,然後監聽Exchange中沒有匹配的隊列的消息,然後進行相操作。

(3)確保消息隊列有足夠內存存儲消息。

RabbitMQ默認配置vm_memory_high_watermark為0.4。意思是控制消息占40%內存左右。vm_memory_high_watermark_paging_ratio為0.5,當消息占用內存超過50%,RabbitMQ會把消息轉移到磁盤上以釋放內存。當磁盤剩余空間小於閥值disk_free_limit(默認為50M),所有生產者阻塞,避免充滿磁盤,導致所有的寫操作失敗。

RabbitMQ配置文件一般在%APPDATA%\RabbitMQ\rabbitmq.config.

%APPDATA% 一般為 C:\Users\%USERNAME%\AppData\Roaming(Windows環境)


二、持久化

消息存放到消息隊列後,在不配置消息持久化的情況下,若服務器重啟、關閉或宕機等,消息都會丟失。配置持久化可以有效提高消息的可靠性。持久化需要同時配置消息持久化和隊列持久化。單配置消息持久化,隊列消失了,消息沒有地方存放;單配置隊列持久化,隊列還在,消息沒了。

隊列持久化在定義隊列時候配置

                //定義隊列
                channel.QueueDeclare(
                    queue: "Mark_Queue", //隊列名稱
                    durable: true, //隊列磁盤持久化                   
                    exclusive: false,//是否排他的,false。如果一個隊列聲明為排他隊列,該隊列首次聲明它的連接可見,並在連接斷開時自動刪除
                    autoDelete: false,//是否自動刪除,一般設成false
                    arguments: null
                    );

  消息持久化在發布消息時候配置

                //消息持久化,把DeliveryMode設成2
                IBasicProperties properties = channel.CreateBasicProperties();
                properties.DeliveryMode = 2;
                    //發布消息
                    channel.BasicPublish(
                        exchange: "MarkTopicChange",
                        routingKey: "MarkRouteKey.one",
                        basicProperties: properties,
                        body: sendBytes
                        );

如何配置了事務機制或發布確認(publisher confirm)機制,服務端的返回Basic.Ack是在消息落盤之後執行的,進一步的提高了消息的可靠性。

為了防止磁盤損壞帶來的消息丟失,可以配置鏡像隊列,這裏不作介紹。


三、消費確認

為了確保消息被消費者消費,RabbitMQ提供消費確認模式(consumer Acknowledgements)。自動確認模式,當消費者成功接收到消息後,自動通知RabbitMQ,把消息隊列中相應消息刪除。這很大程度上滿足不了我們,假如消費者接收到消息後,服務器宕機,消息還沒處理完成,這樣就會造成消息丟失。手動確認模式,當消費者成功處理完消息後,手動發消息通知RabbitMQ,把消息隊列中相應消息刪除。

                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘",
                                          routingKey,
                                          message);

                    //確認該消息已被消費,發刪除消息給RabbitMQ,把消息隊列中的消息刪除 
                    channel.BasicAck(ea.DeliveryTag, false);
                    //消費消息失敗,拒絕此消息,重回隊列,讓它可以繼續發送到其他消費者 
                    //channel.BasicReject(ea.DeliveryTag, true);
                    //消費消息失敗,拒絕多條消息,重回隊列,讓它們可以繼續發送到其他消費者 
                    //channel.BasicNack(ea.DeliveryTag, true, true);
                    };
                    //手動確認消息,把autoAck設成false
                    channel.BasicConsume(queue: "Mark_Queue",
                                         autoAck: false,
                                         consumer: consumer);

這裏值得註意的是,消息處理完成後,一定要把處理完成的消息發送到RabbitMQ(channel.BasicAck(ea.DeliveryTag, false)),不然RabbitMQ會一直等待,從而造成內存泄露。若處理消息過程中發生異常,可以使用channel.BasicReject(ea.DeliveryTag, true)來拒絕此消息,讓它重回隊列。若RabbitMQ收不到消費者任何確認消息的信號(包括確認信號,拒絕信號燈),直到此消費者斷開連接,消息才能重回隊列,繼續發送到其他消費者。

提醒一下,假如消費者消費消息的方法不支持並發(取決於需求),可以限制消費者每次只接收一條消息。

channel.BasicQos(0, 1, false);


  

RabbitMQ消息可靠性分析和應用