rabbitmq工作模式---路由模式
阿新 • • 發佈:2022-03-02
模式說明:
- 佇列與交換機的繫結,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
- 訊息的傳送方在向Exchange傳送訊息時,也必須指定訊息的RoutingKey
- Exchange不再把訊息交給每一個繫結的佇列,而是根據訊息的RoutingKey進行判斷,只有佇列的Routingkey與訊息的Routingkey完全一致,才會接收到訊息
生產者示例程式碼
package com.rabbitmq.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 傳送訊息 */ public class Producer_Routing { public static void main(String[] args) throws IOException, TimeoutException { //1.建立連線工廠 ConnectionFactory factory = new ConnectionFactory(); //2. 設定引數 factory.setHost("172.16.98.133");//ip 預設值 localhost factory.setPort(5672); //埠 預設值 5672 factory.setVirtualHost("/bgt");//虛擬機器 預設值/ factory.setUsername("testuser");//使用者名稱 預設 guest factory.setPassword("111111");//密碼 預設值 guest //3. 建立連線 Connection Connection connection = factory.newConnection(); //4. 建立Channel Channel channel = connection.createChannel(); /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 引數: 1. exchange:交換機名稱 2. type:交換機型別 DIRECT("direct"),:定向 FANOUT("fanout"),:扇形(廣播),傳送訊息到每一個與之繫結佇列。 TOPIC("topic"),萬用字元的方式 HEADERS("headers");引數匹配 3. durable:是否持久化 4. autoDelete:自動刪除 5. internal:內部使用。 一般false 6. arguments:引數 */ String exchangeName = "test_direct"; //5. 建立交換機 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null); //6. 建立佇列 String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7. 繫結佇列和交換機 /* queueBind(String queue, String exchange, String routingKey) 引數: 1. queue:佇列名稱 2. exchange:交換機名稱 3. routingKey:路由鍵,繫結規則 如果交換機的型別為fanout ,routingKey設定為"" */ //佇列1繫結 error channel.queueBind(queue1Name,exchangeName,"error"); //佇列2繫結 info error warning channel.queueBind(queue2Name,exchangeName,"info"); channel.queueBind(queue2Name,exchangeName,"error"); channel.queueBind(queue2Name,exchangeName,"warning"); String body = "日誌資訊:張三呼叫了delete方法...出錯誤了。。。日誌級別:error..."; //8. 傳送訊息 channel.basicPublish(exchangeName,"error",null,body.getBytes()); //9. 釋放資源 channel.close(); connection.close(); } }
消費者示例程式碼
消費者監聽佇列queue2Name,因此能接收到訊息.
package com.rabbitmq.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_Routing1 { public static void main(String[] args) throws IOException, TimeoutException { //1.建立連線工廠 ConnectionFactory factory = new ConnectionFactory(); //2. 設定引數 factory.setHost("172.16.98.133");//ip 預設值 localhost factory.setPort(5672); //埠 預設值 5672 factory.setVirtualHost("/bgt");//虛擬機器 預設值/ factory.setUsername("testuser");//使用者名稱 預設 guest factory.setPassword("111111");//密碼 預設值 guest //3. 建立連線 Connection Connection connection = factory.newConnection(); //4. 建立Channel Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; /* basicConsume(String queue, boolean autoAck, Consumer callback) 引數: 1. queue:佇列名稱 2. autoAck:是否自動確認 3. callback:回撥物件 */ // 接收訊息 Consumer consumer = new DefaultConsumer(channel){ /* 回撥方法,當收到訊息後,會自動執行該方法 1. consumerTag:標識 2. envelope:獲取一些資訊,交換機,路由key... 3. properties:配置資訊 4. body:資料 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("將日誌資訊列印到控制檯....."); } }; //監聽佇列2 channel.basicConsume(queue2Name,true,consumer); } }
本文來自部落格園,作者:bgtong,轉載請註明原文連結:https://www.cnblogs.com/bgtong/p/15957467.html