RabbitMQ原理三--訊息持久化
原文地址:http://www.cnblogs.com/ericli-ericli/p/5938106.html
問題及方案描述
1.當有多個消費者同時收取訊息,且每個消費者在接收訊息的同時,還要處理其它的事情,且會消耗很長的時間。在此過程中可能會出現一些意外,比如訊息接收到一半的時候,一個消費者死掉了。
這種情況要使用訊息接收確認機制,可以執行上次宕機的消費者沒有完成的事情。
2.在預設情況下,我們程式建立的訊息佇列以及存放在佇列裡面的訊息,都是非持久化的。當RabbitMQ死掉了或者重啟了,上次建立的佇列、訊息都不會儲存。
這種情況可以使用RabbitMQ提供的訊息佇列的持久化機制。
相關理論描述
RabbitMQ支援訊息的持久化,也就是資料寫在磁碟上,為了資料安全考慮,我個人覺得大多數開發人員都會選擇持久化。
佇列和交換機有一個建立時候指定的標誌durable。durable的唯一含義就是具有這個標誌的佇列和交換機會在重啟之後重新建立,它不表示說在隊列當中的訊息會在重啟後恢復。
訊息佇列持久化包括3個部分:
1、exchange持久化,在宣告時指定durable => true
2、queue持久化,在宣告時指定durable => true
3、訊息持久化,在投遞時指定delivery_mode=> 2(1是非持久化)
如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立繫結。
注意:一旦建立了佇列和交換機,就不能修改其標誌了。例如,如果建立了一個non-durable的佇列,然後想把它改變成durable的,唯一的辦法就是刪除這個佇列然後重現建立。
程式示例
生產者
class Producter { const string ExchangeName = "eric.exchange"; const string QueueName = "eric.queue"; static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//宣告訊息佇列,且為可持久化的 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//宣告訊息佇列,且為可持久化的 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); string message = "Eric is very handsome"; var body = Encoding.UTF8.GetBytes(message); //將佇列設定為持久化之後,還需要將訊息也設為可持久化的 var props = channel.CreateBasicProperties(); props.SetPersistent(true); channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body); Console.WriteLine("Producter Sent: {0}", message); Console.ReadKey(); } } }
注:ack是 acknowledgments 的縮寫,noAck 是("no manual acks")
因為我前段時間換了筆記本,所以使用者的“eric”的操作出踩了個坑,下面進行介紹下:
如果除錯執行時報錯:None of the specified endpoints were reachable
innerException是:
{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text=\"Unexpected Exception\", classId=0, methodId=0, cause=System.IO.IOException: 無法從傳輸連線中讀取資料: 遠端主機強迫關閉了一個現有的連線。。 ---> System.Net.Sockets.SocketException: 遠端主機強迫關閉了一個現有的連線。\r\n 在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)\r\n 在 System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)\r\n --- 內部異常堆疊跟蹤的結尾 ---\r\n 在 RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)\r\n 在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()\r\n 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()\r\n 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}
這說明我們使用的使用者 不是 系統預設的 guest 而是我們自己建立的使用者,但是沒有足夠的許可權進行操作。
解決辦法:
rabbitmqctl set_user_tags username administrator rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
執行結果:
程式執行結果:
消費者
class Recevice { const string ExchangeName = "eric.exchange"; const string QueueName = "eric.queue"; public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//宣告訊息佇列,且為可持久化的 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//宣告訊息佇列,且為可持久化的 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true); //NoAck:true 告訴RabbitMQ立即從佇列中刪除訊息,另一個非常受歡迎的方式是從佇列中刪除已經確認接收的訊息,可以通過單獨呼叫BasicAck 進行確認: //BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false); var msgContent = Encoding.UTF8.GetString(msgResponse.Body); Console.WriteLine("The received content:"+msgContent); channel.BasicAck(msgResponse.DeliveryTag, multiple: false); //使用BasicAck方式來告之是否從佇列中移除該條訊息 //需要額外注意,比如從佇列中獲取訊息並用它來操作資料庫或日誌檔案時,如果出現操作失敗時,則該條訊息應該保留在佇列中,只到操作成功時才從佇列中移除。 Console.ReadKey(); } } }
接受訊息還有一種方法,就是通過基於推送的事件訂閱。可以使用內建的 QueueingBasicConsumer 提供簡化的程式設計模型,允許在共享佇列上阻塞,直到收到一條訊息。
var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(QueueName, noAck: true, consumer: consumer); var msgResponse = consumer.Queue.Dequeue(); var msgContent = Encoding.UTF8.GetString(msgResponse.Body);
程式執行結果: