1. 程式人生 > 其它 >RabbitMQ學習交換機之fanout模型

RabbitMQ學習交換機之fanout模型

在這裡插入圖片描述
fanout:廣播型別,交換機會將訊息傳送到所有與之繫結的佇列中。
生產者程式碼

Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //宣告一個fanout型別交換機,
        channel.exchangeDeclare("publish_subscribe_exchange", BuiltinExchangeType.FANOUT,true,false,null);
        //宣告兩個佇列
channel.queueDeclare("publish_subscribe_queue1",true,false,false,null); channel.queueDeclare("publish_subscribe_queue2",true,false,false,null); //將兩個佇列與交換機進行繫結,因為是fanout型別,所以繫結時的routingKey可以設為"", // 在進行訊息傳送時的routingKey也設為""就行 channel.
queueBind("publish_subscribe_queue1","publish_subscribe_exchange",""); channel.queueBind("publish_subscribe_queue2","publish_subscribe_exchange",""); String body="來自publish_subscribe_exchange的訊息"; //在將訊息傳遞給交換機時指定交換機的名稱和routingKey,
//因為時Fanout型別交換機,所以將此處的routingKey設為與將佇列和交換機繫結時的routingKey相同即可,通常設為"" channel.basicPublish("publish_subscribe_exchange","",null,body.getBytes()); channel.close(); connection.close();

消費者1程式碼

 Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
             AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("publish_subscribe_consumer1接收到訊息->"+new String(body));
            }
        };
        channel.basicConsume("publish_subscribe_queue1",true,consumer);

消費者2程式碼

Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, 
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("publish_subscribe_consumer2接收到訊息->"+new String(body));
            }
        };
        channel.basicConsume("publish_subscribe_queue2",true,consumer);

總結
FanOut型別交換機在與佇列進行繫結時將routingKey設定為"",在將訊息傳送給交換機時,指定交換機的名稱和routingKey,fanout的routingkey設定為"",與佇列和交換機繫結時設定的routingKey相同

與Spring整合
生產者

<!--宣告fanout型別交換機-->
    <rabbit:queue id="spring_fanout_queue1" name="spring_fanout_queue1" auto-declare="true"/>
    <rabbit:queue id="spring_fanout_queue2" name="spring_fanout_queue2" auto-declare="true"/>
    <!--交換機與佇列繫結-->
    <rabbit:fanout-exchange name="spring_fanout_exchange" auto-declare="true" id="spring_fanout_exchange">
        <rabbit:bindings>
            <!--fanout型別交換機在於佇列繫結時不需要指定routingKey-->
            <rabbit:binding queue="spring_fanout_queue1"/>
            <rabbit:binding queue="spring_fanout_queue2"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

生產者程式碼

rabbitTemplate.convertAndSend("spring_fanout_exchange","","訊息內容");
//指定交換機名稱,fanout型別的routingKey為""

向spring容器中注入:

@Component
public class Sping_mq_listener implements MessageListener {
    public void onMessage(Message message) {
        System.out.println("onMessage");
        System.out.println(new String(message.getBody()));
    }
}

通過在xml中配置佇列名進行監聽

 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="2">
        <!--通過佇列名進行監聽-->
        <rabbit:listener ref="sping_mq_listener" queue-names="spring_topic_queue1"/>
    </rabbit:listener-container>

總結
在整合時,
生產者通過在xml檔案中宣告fanoout交換機和佇列進行繫結,繫結時指定交換機名稱和routingKey("")
消費者通過實現MessageListener介面,將其放入spring容器中,再通過xml配置其監聽的佇列

 <rabbit:listener-container connection-
        <!--通過佇列名進行監聽-->
        <rabbit:listener ref="sping_mq_listener" queue-names="spring_topic_queue1"/>
    </rabbit:listener-container>