1. 程式人生 > 其它 >RabbitMQ學習交換機之direct模型

RabbitMQ學習交換機之direct模型

在這裡插入圖片描述
和fanout型別的區別就是,在進行交換機和佇列繫結時的routingKey不再統一是"",指定routingKey進行繫結。同樣在傳送訊息到交換機時也需要指定routingKey,不再是""。這樣通過交換機與佇列繫結時的routngKey和傳送訊息是指定的routingKey進行對應,交換機就知道該將訊息傳送到哪個佇列。
生產者程式碼

Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel
(); channel.exchangeDeclare("publish_subscribe_direct_exchange", BuiltinExchangeType.DIRECT,true,false,null); channel.queueDeclare("publish_subscribe_direct_queue1",true,false,false,null); channel.queueDeclare("publish_subscribe_direct_queue2",true
,false,false,null); //繫結是可為一個佇列指定多個routingKey channel.queueBind("publish_subscribe_direct_queue1","publish_subscribe_direct_exchange","info"); channel.queueBind("publish_subscribe_direct_queue1","publish_subscribe_direct_exchange"
,"waring"); channel.queueBind("publish_subscribe_direct_queue2","publish_subscribe_direct_exchange","info"); String body1="來自publish_subscribe_direct_exchange的訊息,routingKey為:info"; channel.basicPublish("publish_subscribe_direct_exchange","info",null,body1.getBytes()); String body2="來自publish_subscribe_direct_exchange的訊息,routingKey為:waring"; channel.basicPublish("publish_subscribe_direct_exchange","waring",null,body2.getBytes());

消費者1程式碼

Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("publish_subscribe_direct_consumer1接收到訊息->"+new String(body));
            }
        };
        channel.basicConsume("publish_subscribe_direct_queue1",true,consumer);

消費者2程式碼

Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("publish_subscribe_direct_consumer2接收到訊息->"+new String(body));
            }
        };
        channel.basicConsume("publish_subscribe_direct_queue2",true,consumer);

與spring進行整合
生產者:xml檔案

<rabbit:direct-exchange name="direct_exchange" id="direct_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="spring_queue" key="info"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!--使用預設的direct型別交換機的佇列,交換機名稱為“”,預設routingKey為佇列名-->
    <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>

傳送訊息指定routingKey

rabbitTemplate.convertAndSend("direct_exchange","info","訊息內容");

消費者:通過重寫MessageListener介面並將其放入spring,並配置其監聽的佇列

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="2">  
        <!--通過佇列名進行監聽-->
        <rabbit:listener ref="sping_mq_listener" queue-names="spring_queue"/>
    </rabbit:listener-container>

實現的MessageListerner介面

@Component
public class Sping_mq_listener implements MessageListener {
    public void onMessage(Message message) {
        System.out.println("onMessage");
        System.out.println(new String(message.getBody()));
    }
}

總結
direct型交換機與佇列進行繫結時需要指定routingKey,一個佇列可以有多個routingKey