RabbitMQ之佇列與訊息持久化
阿新 • • 發佈:2019-02-19
佇列持久化
在之前的例子中,我們所用的佇列都是臨時佇列,當服務重啟後之前建立的佇列就都沒有了。
佇列的持久化是在定義佇列時的第二個引數決定的(false為佇列不用持久化)
- channel.queueDeclare(queueName, false, false,false,null);
訊息持久化
如果要在重啟後保持訊息的持久化必須設定訊息是持久化的標誌- channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
示例程式碼
傳送端:- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- /**
- * 訊息持久化
- * @author menghh
- *
- */
- publicclass Send04 {
- publicstaticvoid main(String[] args) throws IOException {
- ConnectionFactory factory = new ConnectionFactory();
- //RabbitMQ-Server安裝在本機,所以直接用127.0.0.1
-
factory.setHost("127.0.0.1"
- //建立一個連線
- Connection conn = factory.newConnection();
- //建立一個通訊通道
- Channel channel = conn.createChannel();
- //定義Queue名稱
- String queueName = "queue01";
- //為Channel定義queue的屬性,queueName為queue名稱
- channel.queueDeclare(queueName, true, false,false,null);
- String msg = "Hello World!";
- //傳送訊息
- /**
- * 測試條件:1、在訊息佇列持久化的前提下2、接收訊息方設定接收方式為手動接收,並不對接收訊息進行確認
- * 不採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息接收不到
- * 採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息依然可以接收到訊息,說明訊息被持久化
- */
- channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
- // channel.basicPublish("", queueName, null, msg.getBytes());
- System.out.println("send message["+msg+"] to "+queueName+"success!");
- //關閉通道
- channel.close();
- //關閉連線
- conn.close();
- }
- }
接收端(跟之前程式碼一樣)
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- import com.rabbitmq.client.QueueingConsumer.Delivery;
- /**
- * 訊息持久化
- * @author menghh
- *
- */
- publicclass Recv04 {
- publicstaticvoid main(String[] args) throws Exception{
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("127.0.0.1");
- Connection conn = factory.newConnection();
- Channel channel = conn.createChannel();
- String queueName = "queue01";
- channel.queueDeclare(queueName, true, false, false, null);
- //以上部分和sender一樣
- //配置好獲取訊息得方式
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, false,consumer);
- //迴圈獲取訊息
- while(true){
- //獲取訊息,如果沒有訊息,這一步將會一直阻塞
- Delivery delivery = consumer.nextDelivery();
- String msg = new String(delivery.getBody());
- //確認訊息,已經收到
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
- System.out.println("received message["+msg+"] from "+queueName);
- }
- }
- }
測試結果:
執行程式後,佇列存在,重啟RabbitMQ Server後佇列依然存在訊息持久化的測試方法: 把消費者中確認接收訊息的程式碼註釋掉(前邊提到過該操作),啟動傳送訊息程式,重啟RabbitMQ Server後訊息依然可以接收到