RabbitMQ訊息的持久化
阿新 • • 發佈:2018-12-30
RabbitMQ訊息持久化需要將訊息和佇列都持久化
佇列持久化
//為Channel定義queue的屬性,queueName為queue名稱 第二個引數持久化標誌,為true表示持久化
channel.queueDeclare(queueName, true, false,false,null);
訊息持久化
傳送端/** * 測試條件:1、在訊息佇列持久化的前提下2、接收訊息方設定接收方式為手動接收,並不對接收訊息進行確認 * 不採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息接收不到 * 採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息依然可以接收到訊息,說明訊息被持久化 */ channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
接收端package cn.rabbitmq.disk; import java.io.IOException; import cn.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; public class SendTest { private final static String EXCHANGE_NAME = "logs1"; public static void main(String[] args) throws IOException { /** * 建立連線連線到MabbitMQ */ Connection connection = ConnectionUtil.getConnection(); // 建立一個頻道 Channel channel = connection.createChannel(); String queueName = "queue01"; //為Channel定義queue的屬性,queueName為queue名稱 第二個引數持久化標誌,為true表示持久化 channel.queueDeclare(queueName, true, false,false,null); String msg = "Hello World!"; //傳送訊息 /** * 測試條件:1、在訊息佇列持久化的前提下2、接收訊息方設定接收方式為手動接收,並不對接收訊息進行確認 * 不採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息接收不到 * 採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息依然可以接收到訊息,說明訊息被持久化 */ channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // channel.basicPublish("", queueName, null, msg.getBytes()); System.out.println("send message["+msg+"] to "+queueName+"success!"); //關閉通道 channel.close(); //關閉連線 connection.close(); } }
package cn.rabbitmq.disk; import cn.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class RecTest { public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); // 建立一個頻道 Channel channel = connection.createChannel(); String queueName = "queue01"; channel.queueDeclare(queueName, true, false, false, null); //以上部分和sender一樣 //配置好獲取訊息得方式 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, false,consumer); //迴圈獲取訊息 while(true){ //獲取訊息,如果沒有訊息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); //確認訊息,已經收到 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); System.out.println("received message["+msg+"] from "+queueName); } } }