RabbitMQ學習交換機之direct模型
阿新 • • 發佈:2020-12-20
和fanout型別的區別就是,在進行交換機和佇列繫結時的routingKey不再統一是"",指定routingKey進行繫結。同樣在傳送訊息到交換機時也需要指定routingKey,不再是""。這樣通過交換機與佇列繫結時的routngKey和傳送訊息是指定的routingKey進行對應,交換機就知道該將訊息傳送到哪個佇列。
生產者程式碼
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel ();
channel.exchangeDeclare("publish_subscribe_direct_exchange", BuiltinExchangeType.DIRECT,true,false,null);
channel.queueDeclare("publish_subscribe_direct_queue1",true,false,false,null);
channel.queueDeclare("publish_subscribe_direct_queue2",true ,false,false,null);
//繫結是可為一個佇列指定多個routingKey
channel.queueBind("publish_subscribe_direct_queue1","publish_subscribe_direct_exchange","info");
channel.queueBind("publish_subscribe_direct_queue1","publish_subscribe_direct_exchange" ,"waring");
channel.queueBind("publish_subscribe_direct_queue2","publish_subscribe_direct_exchange","info");
String body1="來自publish_subscribe_direct_exchange的訊息,routingKey為:info";
channel.basicPublish("publish_subscribe_direct_exchange","info",null,body1.getBytes());
String body2="來自publish_subscribe_direct_exchange的訊息,routingKey為:waring";
channel.basicPublish("publish_subscribe_direct_exchange","waring",null,body2.getBytes());
消費者1程式碼
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("publish_subscribe_direct_consumer1接收到訊息->"+new String(body));
}
};
channel.basicConsume("publish_subscribe_direct_queue1",true,consumer);
消費者2程式碼
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("publish_subscribe_direct_consumer2接收到訊息->"+new String(body));
}
};
channel.basicConsume("publish_subscribe_direct_queue2",true,consumer);
與spring進行整合
生產者:xml檔案
<rabbit:direct-exchange name="direct_exchange" id="direct_exchange">
<rabbit:bindings>
<rabbit:binding queue="spring_queue" key="info"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--使用預設的direct型別交換機的佇列,交換機名稱為“”,預設routingKey為佇列名-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
傳送訊息指定routingKey
rabbitTemplate.convertAndSend("direct_exchange","info","訊息內容");
消費者:通過重寫MessageListener介面並將其放入spring,並配置其監聽的佇列
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="2">
<!--通過佇列名進行監聽-->
<rabbit:listener ref="sping_mq_listener" queue-names="spring_queue"/>
</rabbit:listener-container>
實現的MessageListerner介面
@Component
public class Sping_mq_listener implements MessageListener {
public void onMessage(Message message) {
System.out.println("onMessage");
System.out.println(new String(message.getBody()));
}
}
總結
direct型交換機與佇列進行繫結時需要指定routingKey,一個佇列可以有多個routingKey