rabbitmq學習(三):rabbitmq之扇形交換機、主題交換機
阿新 • • 發佈:2018-12-02
前言
上篇我們學習了rabbitmq的作用以及直連交換機的程式碼實現,這篇我們繼續看如何用程式碼實現扇形交換機和主題交換機
一、扇形交換機
1.生產者
/** * 生產者 */ public class LogProducer { //交換機名稱 public final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection= null; Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); for (int i = 0; i < 5;i++){ String message = "Hello Rabbit " + i; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("EmitLog send message " + message); } } catch (IOException e) { e.printStackTrace(); } finally { try { channel.close(); connection.close(); }catch (IOException e) { e.printStackTrace(); } } } }
2.消費者
Consumer1
/** * 消費者 */ public class Consumer1 { public final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); //宣告一個交換機,釋出模式為fanout-扇形 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //將佇列和交換機繫結起來,因為扇形交換機和路由鍵無關,所以這裡路由鍵設為空字串即可 channel.queueBind(queueName,EXCHANGE_NAME,""); QueueingConsumer consumer = new QueueingConsumer(channel); //當連線斷開時,佇列會自動被刪除 channel.basicConsume(queueName,true,consumer); System.out.println("ReceiveLogTopic1 Waitting for message"); while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody(), "UTF-8"); System.out.println("ReceiveLogTopic1 receives message " + message); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Cosumer2
/** * 消費者2 */ public class Consumer2 { public final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); //宣告一個交換機,釋出模式為fanout-扇形 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //將佇列和交換機繫結起來,因為扇形交換機和路由鍵無關,所以這裡路由鍵設為空字串即可 channel.queueBind(queueName,EXCHANGE_NAME,""); QueueingConsumer consumer = new QueueingConsumer(channel); //當連線斷開時,佇列會自動被刪除 channel.basicConsume(queueName,true,consumer); System.out.println("ReceiveLog2 Waitting for message"); while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody(), "UTF-8"); System.out.println("ReceiveLog2 receives message " + message); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
先啟動Consumer1,Consumer2,再啟動LogProducer。結果如下:
LogProducer:
Consumer1:
Consumer2:
從輸出結果中我們可以看出,扇形交換機所接受到的訊息會被分發到所有繫結到該交換機上的佇列中,和路由鍵無關。
二、主題交換機
1.生產者
/** * 生產者 */ public class Producer { private static final String EXCHANGE_NAME = "topic_logs"; // 路由關鍵字 private static final String[] routingKeys = new String[]{ "quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit"}; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); //宣告交換機 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //迴圈傳送具有不同routing key的message for (String routingKey : routingKeys) { String message = routingKey + "--->biu~"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("Producer -> routingkey: " + routingKey + ", send message " + message); } } catch (IOException e) { e.printStackTrace(); } finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
2.消費者
Consumer1
/** * 消費者1 */ public class Consumer1 { private static final String EXCHANGE_NAME = "topic_logs"; // 路由關鍵字 private static final String[] routingKeys = new String[]{"*.orange.*"}; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); //宣告佇列 String queueName = channel.queueDeclare().getQueue(); //宣告交換機 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //將佇列與交換器用routingkey繫結起來 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, routingKey); System.out.println("Consumer1 -> queue: " + queueName + ", exchange_name: " + EXCHANGE_NAME + ", routingKey: " + routingKey); } //接收訊息 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); System.out.println("Consumer1 waitting for message"); while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody(), "UTF-8"); Envelope envelope = delivery.getEnvelope(); String routingKey = envelope.getRoutingKey(); System.out.println("Consumer1 receive message " + message + ", routingKey: " + routingKey); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer2
/** * 消費者2 */ public class Consumer2 { private static final String EXCHANGE_NAME = "topic_logs"; // 路由關鍵字 private static final String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); //宣告佇列 String queueName = channel.queueDeclare().getQueue(); //宣告交換機 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //將佇列與交換器用routingkey繫結起來 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, routingKey); System.out.println("Consumer2 -> queue: " + queueName + ", exchange_name: " + EXCHANGE_NAME + ", routingKey: " + routingKey); } //接收訊息 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); System.out.println("Consumer2 waitting for message"); while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody(), "UTF-8"); Envelope envelope = delivery.getEnvelope(); String routingKey = envelope.getRoutingKey(); System.out.println("Consumer2 receive message " + message + ", routingKey: " + routingKey); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
同樣先執行消費者,再執行生產者,結果如下:
Producer:
Consumer1:
Consumer2:
由執行結果我們可以看到:訊息被交換機按照模式路由鍵的規則路由到相應的佇列中。