RabbitMQ學習交換機之fanout模型
阿新 • • 發佈:2020-12-20
fanout:廣播型別,交換機會將訊息傳送到所有與之繫結的佇列中。
生產者程式碼
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//宣告一個fanout型別交換機,
channel.exchangeDeclare("publish_subscribe_exchange", BuiltinExchangeType.FANOUT,true,false,null);
//宣告兩個佇列
channel.queueDeclare("publish_subscribe_queue1",true,false,false,null);
channel.queueDeclare("publish_subscribe_queue2",true,false,false,null);
//將兩個佇列與交換機進行繫結,因為是fanout型別,所以繫結時的routingKey可以設為"",
// 在進行訊息傳送時的routingKey也設為""就行
channel. queueBind("publish_subscribe_queue1","publish_subscribe_exchange","");
channel.queueBind("publish_subscribe_queue2","publish_subscribe_exchange","");
String body="來自publish_subscribe_exchange的訊息";
//在將訊息傳遞給交換機時指定交換機的名稱和routingKey,
//因為時Fanout型別交換機,所以將此處的routingKey設為與將佇列和交換機繫結時的routingKey相同即可,通常設為""
channel.basicPublish("publish_subscribe_exchange","",null,body.getBytes());
channel.close();
connection.close();
消費者1程式碼
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("publish_subscribe_consumer1接收到訊息->"+new String(body));
}
};
channel.basicConsume("publish_subscribe_queue1",true,consumer);
消費者2程式碼
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("publish_subscribe_consumer2接收到訊息->"+new String(body));
}
};
channel.basicConsume("publish_subscribe_queue2",true,consumer);
總結
FanOut型別交換機在與佇列進行繫結時將routingKey設定為"",在將訊息傳送給交換機時,指定交換機的名稱和routingKey,fanout的routingkey設定為"",與佇列和交換機繫結時設定的routingKey相同
與Spring整合
生產者
<!--宣告fanout型別交換機-->
<rabbit:queue id="spring_fanout_queue1" name="spring_fanout_queue1" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue2" name="spring_fanout_queue2" auto-declare="true"/>
<!--交換機與佇列繫結-->
<rabbit:fanout-exchange name="spring_fanout_exchange" auto-declare="true" id="spring_fanout_exchange">
<rabbit:bindings>
<!--fanout型別交換機在於佇列繫結時不需要指定routingKey-->
<rabbit:binding queue="spring_fanout_queue1"/>
<rabbit:binding queue="spring_fanout_queue2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
生產者程式碼
rabbitTemplate.convertAndSend("spring_fanout_exchange","","訊息內容");
//指定交換機名稱,fanout型別的routingKey為""
向spring容器中注入:
@Component
public class Sping_mq_listener implements MessageListener {
public void onMessage(Message message) {
System.out.println("onMessage");
System.out.println(new String(message.getBody()));
}
}
通過在xml中配置佇列名進行監聽
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="2">
<!--通過佇列名進行監聽-->
<rabbit:listener ref="sping_mq_listener" queue-names="spring_topic_queue1"/>
</rabbit:listener-container>
總結
在整合時,
生產者通過在xml檔案中宣告fanoout交換機和佇列進行繫結,繫結時指定交換機名稱和routingKey("")
消費者通過實現MessageListener介面,將其放入spring容器中,再通過xml配置其監聽的佇列
<rabbit:listener-container connection-
<!--通過佇列名進行監聽-->
<rabbit:listener ref="sping_mq_listener" queue-names="spring_topic_queue1"/>
</rabbit:listener-container>