RabbitMQ原理與相關操作(三)消息持久化
現在聊一下RabbitMQ消息持久化:
問題及方案描述
1.當有多個消費者同時收取消息,且每個消費者在接收消息的同時,還要處理其它的事情,且會消耗很長的時間。在此過程中可能會出現一些意外,比如消息接收到一半的時候,一個消費者死掉了。
這種情況要使用消息接收確認機制,可以執行上次宕機的消費者沒有完成的事情。
2.在默認情況下,我們程序創建的消息隊列以及存放在隊列裏面的消息,都是非持久化的。當RabbitMQ死掉了或者重啟了,上次創建的隊列、消息都不會保存。
這種情況可以使用RabbitMQ提供的消息隊列的持久化機制。
相關理論描述
RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我個人覺得大多數開發人員都會選擇持久化。
隊列和交換機有一個創建時候指定的標誌durable。durable的唯一含義就是具有這個標誌的隊列和交換機會在重啟之後重新建立,它不表示說在隊列當中的消息會在重啟後恢復。
消息隊列持久化包括3個部分:
1、exchange持久化,在聲明時指定durable => true
2、queue持久化,在聲明時指定durable => true
3、消息持久化,在投遞時指定delivery_mode=> 2(1是非持久化)
如果exchange和queue都是持久化的,那麽它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。
註意:一旦創建了隊列和交換機,就不能修改其標誌了。例如,如果創建了一個non-durable的隊列,然後想把它改變成durable的,唯一的辦法就是刪除這個隊列然後重現創建。
程序示例
生產者
View Code註:ack是 acknowledgments 的縮寫,noAck 是("no manual acks")
因為我前段時間換了筆記本,所以用戶的“eric”的操作出踩了個坑,下面進行介紹下:
如果調試運行時報錯:None of the specified endpoints were reachable
innerException是:
{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text=\"Unexpected Exception\", classId=0, methodId=0, cause=System.IO.IOException: 無法從傳輸連接中讀取數據: 遠程主機強迫關閉了一個現有的連接。。 ---> System.Net.Sockets.SocketException: 遠程主機強迫關閉了一個現有的連接。\r\n 在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)\r\n 在 System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)\r\n --- 內部異常堆棧跟蹤的結尾 ---\r\n 在 RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)\r\n 在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()\r\n 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()\r\n 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}
這說明我們使用的用戶 不是 系統默認的 guest 而是我們自己創建的用戶,但是沒有足夠的權限進行操作。
解決辦法:
rabbitmqctl set_user_tags username administrator rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
執行結果:
相關其他操作見:windows下 安裝 rabbitMQ 及操作常用命令
程序運行結果:
消費者
View Code接受消息還有一種方法,就是通過基於推送的事件訂閱。可以使用內置的 QueueingBasicConsumer 提供簡化的編程模型,允許在共享隊列上阻塞,直到收到一條消息。
View Code程序運行結果:
RabbitMQ原理與相關操作(三)消息持久化