1. 程式人生 > >分布式系統消息中間件——RabbitMQ的使用思考篇

分布式系統消息中間件——RabbitMQ的使用思考篇

內部 機制 () 順序 enc new round 兩個 內容

分布式系統消息中間件——RabbitMQ的使用思考篇

前言

????前面的兩篇文章分布式系統消息中間件——RabbitMQ的使用基礎篇與分布式系統消息中間件——RabbitMQ的使用進階篇,我們簡單介紹了消息中間件與RabbitMQ的一些基本概念、基礎用法以及常用的幾個特性。但如果我們想更好的去結合我們的業務場景使用好RabbitMQ,我們還需要思考一些問題。比如:何時去創建隊列,RabbitMQ的持久化,如何保證消息到達RabbitMQ,以及消費者如何確認消息......

一、何時創建隊列

????從前面的文章我們知道,RabbitMQ可以選擇在生產者創建隊列,也可以在消費者端創建隊列,也可以提前創建好隊列,而生產者消費者直接使用即可。

????RabbitMQ的消息存儲在隊列中,交換器的使用並不真正耗費服務器的性能,而隊列會。如在實際業務應用中,需要對所創建的隊列的流量、內存占用及網卡占用有一個清晰的認知,預估其平均值和峰值,以便在固定硬件資源的情況下能夠進行合理有效的分配。

????按照RabbitMQ官方建議,生產者和消費者都應該嘗試創建(這裏指聲明操作)隊列。這雖然是一個很好的建議,但是在我看來這個時間上沒有最好的方案,只有最適合的方案。我們往往需要結合業務、資源等方面在各種方案裏面選擇一個最適合我們的方案。

????如果業務本身在架構設計之初己經充分地預估了隊列的使用情況,完全可以在業務程序上線之前在服務器上創建好(比如通過頁面管理、RabbitMQ命令或者更好的是從配置中心下發),這樣業務程序也可以免去聲明的過程,直接使用即可。預先創建好資源還有一個好處是,可以確保交換器和隊列之間正確地綁定匹配。很多時候,由於人為因素、代碼缺陷等,發送消息的交換器並沒有綁定任何隊列,那麽消息將會丟失:或者交換器綁定了某個隊列,但是發送消息時的路由鍵無法與現存的隊列匹配,那麽消息也會丟失。當然可以配合mandatory參數或者備份交換器(關於mandatory參數的使用詳細可參考我的上一篇文章) 來提高程序的健壯性。與此同時,預估好隊列的使用情況非常重要,如果在後期運行過程中超過預定的閾值,可以根據實際情況對當前集群進行擴容或者將相應的隊列遷移到其他集群。遷移的過程也可以對業務程序完全透明。此種方法也更有利於開發和運維分工,便於相應資源的管理。如果集群資源充足,而即將使用的隊列所占用的資源又在可控的範圍之內,為了增加業務程序的靈活性,也完全可以在業務程序中聲明隊列。至於是使用預先分配創建資源的靜態方式還是動態的創建方式,需要從業務邏輯本身、公司運維體系和公司硬件資源等方面考慮。

二、持久化及策略

????作為一個內存中間件,在保證了速度的情況下,不可避免存在如內存數據庫同樣的問題,即丟失問題。持久化可以提高RabbitMQ 的可靠性,以防在異常情況(重啟、關閉、宕機等)下的數據丟失。RabbitMQ的持久化分為三個部分:交換器的持久化、隊列的持久化和消息的持久化。

  1. 交換器的持久化

????交換器的持久化是通過在聲明隊列是將durable 參數置為true 實現的(該參數默認為false)。如果交換器不設置持久化,那麽在RabbitMQ 服務重啟之後,相關的交換器元數據會丟失,不過消息不會丟失,只是不能將消息發送到這個交換器中了。對一個長期使用的交換器來說,建議將其置為持久化的。

  1. 隊列的持久化

????隊列的持久化是通過在聲明隊列時將durable 參數置為true 實現的(該參數默認為false),如果隊列不設置持久化,那麽在RabbitMQ 服務重啟之後,相關隊列的元數據會丟失,此時數據也會丟失。正所謂"皮之不存,毛將焉附",隊列都沒有了,消息又能存在哪裏呢?

  1. 消息的持久化

????隊列的持久化能保證其本身的元數據不會因異常情況而丟失,但是並不能保證內部所存儲的消息不會丟失。要確保消息不會丟失,需要將其設置為持久化。通過將消息的投遞模式(BasicProperties中的DeliveryMode屬性)設置為2即可實現消息的持久化。

????因此,消息如果要想在Rabbit重啟、關閉、宕機時能夠恢復,需要做到以下三點:

  • 把消息的投遞模式設置為2
  • 發送到持久化的交換器
  • 到達持久化的隊列

????註意:RabbitMQ 確保持久化消息能從服務器重啟中恢復的方式是將它們寫入磁盤上的一個持久化日誌文件中。當發布一條持久化消息到持久化交換器時,Rabbit會在日誌提交到日誌文件後才發送響應(開啟生產者確認機制)。之後,如果消息到了非持久化隊列,它會自動從日誌文件中刪除,並且無法在服務器重啟後恢復。因此單單只設置隊列持久化,重啟之後消息會丟失;單單只設置消息的持久化,重啟之後隊列消失,繼而消息也丟失。單單設置消息持久化而不設置隊列的持久化是毫無意義的。當從持久化隊列中消費了消息後(並且確認後),RabbitMQ會在持久化日誌中把這條消息標記為等待垃圾收集。而在消費持久化消息之前,若RabbitMQ服務器重啟,會自動重建交換器、隊列以及綁定,重播持久化日誌文件中的消息到合適的隊列或者交換器上(取決於宕機時,消息處在路由的哪個環節)。

????為了保障消息不會丟失,也許我們可以簡單粗暴的將所有的消息標記為持久化,但這樣我們會付出性能的代價。寫入磁盤的速度比寫入內存的速度慢得不只一點點。對於可靠性不是那麽高的消息可以不采用持久化處理以提高整體的吞吐量。在選擇是否要將消息持久化時,需要在可靠性和吐吞量之間做一個權衡。

????將交換器、隊列、消息都設置了持久化之後就能百分之百保證數據不丟失了嗎?

  • 從消費者來說,如果在訂閱消費隊列時將noAck參數設置為true ,那麽當消費者接收到相關消息之後,還沒來得及處理就宕機了,這樣也算數據丟失。
  • 在持久化的消息正確存入RabbitMQ 之後,還需要有一段時間(雖然很短,但是不可忽視〉才能存入磁盤之中。RabbitMQ 並不會為每條消息都進行同步存盤的處理,可能僅僅保存到操作系統緩存之中而不是物理磁盤之中。如果在這段時間內RabbitMQ 服務節點發生了巖機、重啟等異常情況,消息保存還沒來得及落盤,那麽這些消息將會丟失。

????關於第一個問題,可以通過消費者確認機制來解決。而第二個問題可以通過生產者確認機制來解決,也可以使用鏡像隊列機制(鏡像隊列機制,將在運維篇總結)。生產者確認消費者確認請往下看。

三、生產者確認

????上文我們知道,在使用RabbitMQ的時候,可以通過消息持久化操作來解決因為服務器的異常崩潰而導致的消息丟失,除此之外,我們還會遇到一個問題,當消息的生產者將消息發送出去之後,消息到底有沒有正確地到達服務器呢?如果不進行特殊配置,默認情況下發送消息的操作是不會返回任何信息給生產者的,也就是默認情況下生產者是不知道消息有沒有正確地到達服務器。如果在消息到達服務器之前己經丟失,持久化操作也解決不了這個問題,因為消息根本沒有到達服務器,何談持久化?

????RabbitMQ針對這個問題,提供了兩種解決方式:

  • 通過事務機制實現:
  • 通過發送方確認(publisher confirm)機制實現。

3.1 RabbitMQ 事務機制

????RabbitMQ 客戶端中與事務機制相關的方法有三個:channel.TxSelect(用於將當前信道設置為事務模式);channel.TxCommit(用於提交事務),channel.TxRollback(用於回滾事務)。在通過channel.TxSelect方法開啟事務之後,我們便可以發布消息給RabbitMQ了,如果事務提交成功,則消息一定到達了RabbitMQ 中,如果在事務提交執行之前由於RabbitMQ異常崩潰或者其他原因拋出異常,這個時候我們便可以將其捕獲,進而通過執行channel.TxRollback方法來實現事務回滾。示例代碼如下所示:

  channel.TxSelect();//將信道設置為事務模式
  try
  {
      //do something
      var message = Encoding.UTF8.GetBytes("TestMsg");
      channel.BasicPublish("normalExchange", "NormalRoutingKey", true, null, message);
      //do something
      channel.TxCommit();//提交事務
  }
  catch (Exception ex)
  {
      //log(ex);
      channel.TxRollback();
  }

????事務確實能夠解決消息發送方和RabbitMQ之間消息確認的問題,只有消息成功被RabbitMQ接收,事務才能提交成功,否則便可在捕獲異常之後進行事務回滾,與此同時可以進行消息重發。但是使用事務同樣會帶來一些問題。

  • 會阻塞,發布者必須等待broker處理每個消息。
  • 事務是重量級的,每次提交都需要fsync(),需要耗費大量的時間
  • 事務非常耗性能,會降低RabbitMQ的消息吞吐量。

3.2 發送方確認機制

????前面介紹了RabbitMQ可能會遇到的一個問題,即消息發送方(生產者〉並不知道消息是否真正地到達了RabbitMQ。隨後了解到在AMQP協議層面提供了事務機制來解決這個問題,但是采用事務機制實現會嚴重降低RabbitMQ的消息吞吐量,這裏就引入了一種輕量級的方式一發送方確認(publisher confirm)機制。生產者將信道設置成confirm確認)模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID( 從1開始),一旦消息被投遞到所有匹配的隊列之後,RabbitMQ就會發送一個確認(BasicAck) 給生產者(包含消息的唯一ID),這就使得生產者知曉消息已經正確到達了目的地了。如果消息和隊列是可持久化的,那麽確認消息會在消息寫入磁盤之後發出。

技術分享圖片

????發送方確認模式,示例代碼如下:

 //示例1--同步等待
 channel.ConfirmSelect();//開啟確認模式
 var message = Encoding.UTF8.GetBytes("TestMsg");
 channel.ExchangeDeclare("normalExchange", "direct", true, false, null);
 channel.QueueDeclare("normalQueue", true, false, false, null);
 channel.QueueBind("normalQueue", "normalExchange", "NormalRoutingKey");
 channel.BasicPublish("normalExchange", "NormalRoutingKey", true, null, message);
 //var result=channel.WaitForConfirmsOrDie(Timeout); 
 //WaitForConfirmsOrDie 使用WaitForConfirmsOrDie 在Rabbit發送Nack命令或超時時會拋出一個異常
 var result = channel.WaitForConfirms();//等待該信道所有未確認的消息結果
 if(!result){
     //send message failed;
 }
 //示例2--異步通知
 channel.ConfirmSelect();//開啟確認模式
 var message = Encoding.UTF8.GetBytes("TestMsg");
 channel.ExchangeDeclare("normalExchange", "direct", true, false, null);
 channel.QueueDeclare("normalQueue", true, false, false, null);
 channel.QueueBind("normalQueue", "normalExchange", "NormalRoutingKey");
 channel.BasicPublish("normalExchange", "NormalRoutingKey", true, null, message);
 channel.BasicAcks += (model, ea) =>
 {
     //消息被投遞到所有匹配的隊列之後,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的唯一ID)
     //ea.Multiple為True代表 ea.DeliveryTag編號之前的消息均已被確認。
    //do something;
 };
 channel.BasicNacks += (model, ea) =>
 {
     //如果RabbitMQ 因為自身內部錯誤導致消息丟失,就會發送一條nack(BasicNack) 命令
    //do something;
 };

????關於生產者確認機制同樣會有一些問題,broker不能保證消息會被confirm,只知道將會進行confirm。這樣如果broker與生產者之間的連接斷開,導致生產者不能收到確認消息,可能會重復進行發布。總之,生產者確認模式給客戶端提供了一種較為輕量級的方式,能夠跟蹤哪些消息被broker處理,哪些可能因為broker宕掉或者網絡失敗的情況而重新發布。

????註意:事務機制和publisher confirm機制兩者是互斥的,不能共存。如果企圖將已開啟事務模式的信道再設置為publisher confmn模式, RabbitMQ會報錯,或者如果企圖將已開啟publisher confirm模式的信道設置為事務模式, RabbitMQ也會報錯。在性能上來看,而到底應該選擇事務機制還是Confirm機制,則需要結合我們的業務場景。

四、消費者確認

????為了保證消息從隊列可靠地達到消費者,RabbitMQ提供了消息確認機制(message acknowledgement)。消費者在訂閱隊列時,可以指定noAck參數,當noAck等於false時,RabbitMQ會等待消費者顯式地回復確認信號後才從內存(或者磁盤)中移去消息(實質上是先打上刪除標記,之後再刪除)。當noAck等於true時,RabbitMQ會自動把發送出去的消息置為確認,然後從內存(或者磁盤)中刪除,而不管消費者是否真正地消費到了這些消息。

????采用消息確認機制後,只要設置noAck參數為false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉後消息丟失的問題,因為RabbitMQ會一直等待持有消息直到消費者顯式調用BasicAck命令為止。

????當noAck參數置為false,對於RabbitMQ服務端而言,隊列中的消息分成了兩個部分:一部分是等待投遞給消費者的消息:一部分是己經投遞給消費者,但是還沒有收到消費者確認信號的消息。如果RabbitMQ 一直沒有收到消費者的確認信號,並且消費此消息的消費者己經斷開連接,則RabbitMQ會安排該消息重新進入隊列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。

????RabbitMQ不會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否己經斷開,這麽設計的原因是RabbitMQ 允許消費者消費一條消息的時間可以很久很久。

????關於RabbitMQ消費者確認機制示例代碼如下:

  //推模式
  EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
  //定義消費者回調事件
  consumer.Received += (model, ea) =>
  {
      //do someting;
      //channel.BasicReject(ea.DeliveryTag, requeue: true);//拒絕
      //requeue參數為true會重新將這條消息存入隊列,以便可以發送給下一個訂閱的消費者
      channel.BasicAck(ea.DeliveryTag, multiple: false);//確認
      //若:multiple參數為true,則確認DeliverTag這個編號之前的消息
  };
  channel.BasicConsume(queue: "queueName",
                      noAck: false,
                     consumer: consumer);

  //拉模式
  BasicGetResult result = channel.BasicGet("queueName", noAck: false);
  //確認
  channel.BasicAck(result.DeliveryTag, multiple: false);

技術分享圖片

????如上,消費者在消費消息的同時,Rabbit會同步給予消費者一個DeliveryTag,這個DeliveryTag就像我們數據庫中的主鍵,消費者在消費完畢後拿著這個DeliveryTag去Rabbit確認或拒絕這個消息。

void BasicAck(ulong deliveryTag, bool multiple);

void BasicReject(ulong deliveryTag, bool requeue);

void BasicNack(ulong deliveryTag, bool multiple, bool requeue);
  • deliveryTag:可以看作消息的編號,它是一個64位的長整型值,最大值是9223372036854775807。
  • requeue:如果requeue 參數設置為true,則RabbitMQ會重新將這條消息存入隊列,以便可以發送給下一個訂閱的消費者;如果requeue 參數設置為false,則RabbitMQ立即會把消息從隊列中移除,而不會把它發送給新的消費者。
  • BasicReject命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack這個命令。
  • multiple:在BasicAck中,multiple 參數設置為true 則表示確認deliveryTag編號之前所有已被當前消費者確認的消息。在BasicNack中,multiple 參數設置為true 則表示拒絕deliveryTag 編號之前所有未被當前消費者確認的消息。

????說明:將channel.BasicReject 或者channel.BasicNack中的requeue設置為false ,可以啟用"死信隊列"的功能。(關於死信隊列請看我的上一篇文章 https://www.cnblogs.com/hunternet/p/9697754.html)。

????上述requeue,都會將消息重新存入隊列發送給下一個消費者(也有可能是其它消費者)。關於requeue還有下面一種用法。可以選擇是否補發給當前的consumer。

//補發消息 true退回到queue中 /false只補發給當前的consumer
channel.BasicRecover(true);

????註意:RabbitMQ僅僅通過Consumer的連接中斷來確認該Message並沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做數據處理。如果忘記了ack,那麽後果很嚴重。當Consumer退出時,Message會重新分發。然後RabbitMQ會占用越來越多的內存,由於RabbitMQ會長時間運行,這個“內存泄漏”是致命的。

五、消息分發與順序

5.1 消息分發

????當RabbitMQ 隊列擁有多個消費者時,隊列收到的消息將以輪詢(round-robin)的分發方式發送給消費者。每條消息只會發送給訂閱列表裏的一個消費者。這種方式非常適合擴展,而且它是專門為並發程序設計的。如果現在負載加重,那麽只需要創建更多的消費者來消費處理消息即可。
????很多時候輪詢的分發機制也不是那麽優雅。默認情況下,如果有n個消費者,那麽RabbitMQ會將第m條消息分發給第m%n (取余的方式)個消費者, RabbitMQ 不管消費者是否消費並己經確認了消息。試想一下,如果某些消費者任務繁重,來不及消費那麽多的消息,而某些其他消費者由於某些原因(比如業務邏輯簡單、機器性能卓越等)很快地處理完了所分配到的消息,進而進程空閑,這樣就會造成整體應用吞吐量的下降。那麽該如何處理這種情況呢?這裏就要用到channel.BasicQos(int prefetchCount)這個方法,channel.BasicQos方法允許限制信道上的消費者所能保持的最大未確認消息的數量。
????舉例說明,在訂閱消費隊列之前,消費端程序調用了channel.BasicQos(5),之後訂閱了某個隊列進行消費。RabbitMQ 會保存一個消費者的列表,每發送一條消息都會為對應的消費者計數,如果達到了所設定的上限,那麽RabbitMQ 就不會向這個消費者再發送任何消息。直到消費者確認了某條消息之後, RabbitMQ 將相應的計數減1,之後消費者可以繼續接收消息,直到再次到達計數上限。

註意:Basic.Qos 的使用對於拉模式的消費方式無效.

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
  • prefetchCount:允許限制信道上的消費者所能保持的最大未確認消息的數量,設置為0表示沒有上限。
  • prefetchSize:消費者所能接收未確認消息的總體大小的上限,單位為B,設置為0表示沒有上限。
  • global:對於一個信道來說,它可以同時消費多個隊列,當設置了prefetchCount 大於0 時,這個信道需要和各個隊列協調以確保發送的消息都沒有超過所限定的prefetchCount 的值,這樣會使RabbitMQ 的性能降低,尤其是這些隊列分散在集群中的多個Broker節點之中。RabbitMQ 為了提升相關的性能,在AMQPO-9-1 協議之上重新定義了global這個參數。如下表所示:
global參數 AMQP 0-9-1 RabbitMQ
false 信道上所有的消費者都需要遵從prefetchCount 的限信道上新的消費者需要遵從prefetchCount 的限定值 信道上新的消費者需要遵從prefetchCount 的限定值
true 當前通信鏈路( Connection) 上所有的消費者都需信道上所有的消費者都需要遵從prefetchCount的限定值 信道上所有的消費者需要遵從prefetchCount 的限定值

註意:

  1. 對於同一個信道上的多個消費者而言,如果設置了prefetchCount 的值,那麽都會生效。
//偽代碼
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.BasicQos(10) ; 
channel.BasicConsume("my-queue1" , false , consumer1);
channel.BasicConsume("my-queue2" , false , consumer2);
//兩個消費者各自的能接收到的未確認消息的上限都為10 。
  1. 如果在訂閱消息之前,既設置了global 為true 的限制,又設置了global為false的限制,RabbitMQ 會確保兩者都會生效。但會增加RabbitMQ的負載因為RabbitMQ 需要更多的資源來協調完成這些限制。
//偽代碼
Channel channel = ...;
Consumer consumerl = ...;
Consumer consumer2 = ...;
channel.BasicQos(3 , false); 
channel.BasicQos(5 , true); 
channel.BasicConsume("queuel" , false , consumerl) ;
channel.BasicConsume("queue2" , false , consumer2) ;
//這裏每個消費者最多只能收到3個未確認的消息,兩個消費者能收到的未確認的消息個數之和的上限為5

5.2 消息順序

????消息的順序性是指消費者消費到的消息和發送者發布的消息的順序是一致的。舉個例子,不考慮消息重復的情況,如果生產者發布的消息分別為msgl、msg2、msg3,那麽消費者必然也是按照msgl、msg2、msg3的順序進行消費的。
????目前很多資料顯示RabbitMQ的消息能夠保障順序性,這是不正確的,或者說這個觀點有很大的局限性。在不使用任何RabbitMQ的高級特性,也沒有消息丟失、網絡故障之類異常的情況發生,並且只有一個消費者的情況下,最好也只有一個生產者的情況下可以保證消息的順序性。如果有多個生產者同時發送消息,無法確定消息到達Broker 的前後順序,也就無法驗證消息的順序性。
????那麽哪些情況下RabbitMQ 的消息順序性會被打破呢?下面介紹幾種常見的情形。

  • 如果生產者使用了事務機制,在發送消息之後遇到異常進行了事務回滾,那麽需要重新補償發送這條消息,如果補償發送是在另一個線程實現的,那麽消息在生產者這個源頭就出現了錯序。同樣,如果啟用publisher confirm時,在發生超時、中斷,又或者是收到RabbitMQ的BasicNack命令時,那麽同樣需要補償發送,結果與事務機制一樣會錯序。或者這種說法有些牽強,我們可以固執地認為消息的順序性保障是從存入隊列之後開始的,而不是在發叠的時候開始的。

  • 考慮另一種情形,如果生產者發送的消息設置了不同的超時時間,井且也設置了死信隊列,整體上來說相當於一個延遲隊列,那麽消費者在消費這個延遲隊列的時候,消息的順序必然不會和生產者發送消息的順序一致。

  • 如果消息設置了優先級,那麽消費者消費到的消息也必然不是順序性的。

  • 如果一個隊列按照前後順序分有msg1, msg2、msg3、msg4這4 個消息,同時有ConsumerA和ConsumerB 這兩個消費者同時訂閱了這個隊列。隊列中的消息輪詢分發到各個消費者之中,ConsumerA 中的消息為msg1和msg3,ConsumerB中的消息為msg2、msg4。ConsumerA收到消息msg1之後並不想處理而調用了BasicNack/BasicReject將消息拒絕,與此同時將requeue設置為true,這樣這條消息就可以重新存入隊列中。消息msg1之後被發送到了ConsumerB中,此時ConsumerB己經消費了msg2、msg4,之後再消費msg1.這樣消息順序性也就錯亂了。

????包括但不僅限於以上幾種情形會使RabbitMQ 消息錯序。如果要保證消息的順序性,需要業務方使用的時候做進一步的處理。如在消息體內添加全局有序標識等。

六、消息傳輸保障

????消息可靠傳輸一般是業務系統接入消息中間件時首要考慮的問題,一般消息中間件的消息
傳輸保障分為三個層級。

  • At most once: 最多一次。消息可能會丟失,但絕不會重復傳輸。
  • At least once: 最少一次。消息絕不會丟失,但可能會重復傳輸。
  • Exactly once: 恰好一次。每條消息肯定會被傳輸一次且僅傳輸一次。

????RabbitMQ 支持其中的"最多一次"和"最少一次"。其中"最少一次"投遞實現需要考慮以下這個幾個方面的內容:

  1. 消息生產者需要開啟事務機制或者publisher confirm 機制,以確保消息可以可靠地傳
    輸到RabbitMQ 中。
  2. 消息生產者需要配合使用mandatory參數或者備份交換器來確保消息能夠從交換器
    路由到隊列中,進而能夠保存下來而不會被丟棄。
  3. 消息和隊列都需要進行持久化處理,以確保RabbitMQ服務器在遇到異常情況時不會造成消息丟失。
  4. 消費者在消費消息的同時需要將noAck設置為false,然後通過手動確認的方式去確認己經正確消費的消息,以避免在消費端引起不必要的消息丟失。

????"最多一次"的方式就無須考慮以上那些方面,生產者隨意發送,消費者隨意消費,不過這樣很難確保消息不會重復消費。

????"恰好一次"是RabbitMQ目前無法保障的(目前我也不知道哪個中間件能夠保證)。消費者在消費完一條消息之後向RabbitMQ 發送確認BasicAck命令,此時由於網絡斷開或者其他原因造成RabbitMQ並沒有收到這個確認命令,那麽RabbitMQ不會將此條消息標記刪除。在重新建立連接之後,消費者還是會消費到這一條消息,這就造成了重復消費。再考慮一種情況,生產者在使用publisher confirm機制的時候,發送完一條消息等待RabbitMQ 返回確認通知,此時網絡斷開,生產者捕獲到異常情況,為了確保消息可靠性選擇重新發送,這樣RabbitMQ中就有兩條同樣的消息,在消費的時候,消費者就會重復消費。而解決重復消費可以通過消費者冪等等方式來解決。

結束語

????本篇文章,我們思考了使用RabbitMQ過程中需要註意的幾個問題,而前兩篇文章對RabbitMQ的概念以及如何使用做了簡單的介紹,相信經過這些介紹已經對RabbitMQ有了基本的了解。但這些遠遠不夠,想要更好的利用好RabbitMQ還需要結合我們的業務場景來更多的去使用它(切記不要為了使用技術而使用技術!)。關於RabbitMQ的運維篇,會在以後的文章中繼續給大家分享。

分布式系統消息中間件——RabbitMQ的使用思考篇