RabbitMQ中連線工具類封裝
阿新 • • 發佈:2020-12-29
RabbitMQ中連線工具類封裝
- 建立工具類
- 連線工具類的封裝實現
package utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtils { private static ConnectionFactory connectionFactory; static { //重量級資源 類載入執行只執行一次 //建立連線mq的連線工廠物件 connectionFactory = new ConnectionFactory(); //設定連線rabbitmq主機 connectionFactory.setHost("rabbitmq主機"); //設定埠號 connectionFactory.setPort(5672); //設定連線哪個虛擬主機 connectionFactory.setVirtualHost("/虛擬主機"); //設定訪問虛擬主機的使用者名稱和密碼 connectionFactory.setUsername("使用者名稱"); connectionFactory.setPassword("密碼"); } //定義提供連線物件的方法 public static Connection getConnection() { try { return connectionFactory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } //關閉通道和關閉連線工具方法 public static void closeConnectionAndChannel(Channel channel, Connection conn) { try { if (channel != null) { channel.close(); } if (conn != null) { conn.close(); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
-
釋出訊息
package helloworld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import utils.RabbitMQUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Provider { //生產訊息 @Test public void testSendMessage() throws IOException, TimeoutException { //通過工具類獲取連線物件 Connection connection = RabbitMQUtils.getConnection(); //獲取連線中的通道 Channel channel = connection.createChannel(); //通道繫結對應訊息佇列 //引數1:佇列名稱 如果佇列不存在自動建立 //引數2:用來定義佇列特性是否要持久化 true 持久化佇列 false 不持久化 //引數3:exclusive 是否獨佔佇列 true 獨佔佇列 false 不獨佔 //引數4:autoDelete:是否在消費完成後自動刪除佇列 true 自動刪除 false 不自動刪除 //引數5:額外附加引數 channel.queueDeclare("hello", false, false, false, null); //釋出訊息 //引數1:交換機名稱 引數2:佇列名稱 引數3:傳遞訊息額外設定 引數4:訊息的具體內容 channel.basicPublish("", "hello", null, "hello rabbitmq".getBytes()); //呼叫工具類 RabbitMQUtils.closeConnectionAndChannel(channel, connection); } }
- 消費訊息
package helloworld; import com.rabbitmq.client.*; import utils.RabbitMQUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Customer { public static void main(String[] args) throws IOException, TimeoutException { //通過工具類獲取連線 Connection connection = RabbitMQUtils.getConnection(); //建立通道 Channel channel = connection.createChannel(); //通道繫結物件 channel.queueDeclare("hello", false, false, false, null); //消費資訊 //引數1:消費哪個佇列的訊息 佇列名稱 //引數2:開始消費的自動確認機制 //引數3:消費時的回撥介面 channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override //最後一個引數:訊息佇列中取出的訊息 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("==========================" + new String(body)); } }); } }
5.專案結構
-
記得先引入引入rabbitmq的相關依賴
<!--引入rabbitmq的相關依賴--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.2</version> </dependency>