RabbitMQ之Pub/Sub模式 (訂閱模式)
阿新 • • 發佈:2021-10-04
RabbitMQ之Pub/Sub模式 (訂閱模式)
概念
在前面的模式中 一個訊息都只能被一個消費者使用
但是在閱讀模式一個訊息可以被多個消費者使用
簡單例子
交換機建立引數:
編寫生產者 建立一個交換機和兩個佇列 並做好關係繫結
public class PubProducer { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.198.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立連線 connection = connectionFactory.newConnection("生產者"); //獲取通道 channel = connection.createChannel(); //建立交換機以及兩個佇列 同時繫結關係 String exchangeName = "test_fanout"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,false,false,false,null); String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,false,false,false,null); channel.queueDeclare(queue2Name,false,false,false,null); //繫結關係 第三個引數為routingKey 繫結規則 fanout使用"" channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { //關閉通道 if(channel != null && channel.isOpen()){ try { channel.close(); } catch (Exception e){ e.printStackTrace(); } } if(connection != null && connection.isOpen()){ try { connection.close(); } catch (Exception e){ e.printStackTrace(); } } } } }
執行程式後在web頁面檢視交換機繫結關係
消費者編寫和之前沒區別 給好佇列名字即可
public class PubConsumer { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.198.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立連線 connection = connectionFactory.newConnection("消費者"); //獲取通道 channel = connection.createChannel(); //通過通道宣告佇列,建立交換機等一系列事情 channel.basicConsume("test_fanout_queue1", true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println("1號消費者接受到的訊息為 " + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("收取訊息失敗"); } }); //卡一下 System.out.println("鍵盤輸入關閉消費者"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { //關閉通道 if(channel != null && channel.isOpen()){ try { channel.close(); } catch (Exception e){ e.printStackTrace(); } } if(connection != null && connection.isOpen()){ try { connection.close(); } catch (Exception e){ e.printStackTrace(); } } } } }
另外一個消費者程式碼改為佇列2即可
執行測試:
先執行兩個消費者等待生產者訊息
執行生產者後雖然只有一條訊息但是可以看到兩個消費者都拿到訊息了