【夏目鬼鬼分享】RabbitMQ路由模式
阿新 • • 發佈:2020-12-09
技術標籤:rabbitmq
路由模式
路由模式是可以根據路由鍵
選擇性給多個消費者傳送訊息的模式,它包含一個生產者、兩個消費者、兩個佇列和一個交換機。兩個消費者同時繫結到不同的佇列上去,兩個佇列通過路由鍵
繫結到交換機上去,生產者傳送訊息到交換機,交換機通過路由鍵
轉發到不同佇列,佇列繫結的消費者接收並消費訊息。
生產者
/** * Created by wzy on 2020/12/7 * 訊息傳送者 */ public class RouteSender { private final static String EXCHANGE_NAME = "myRoute"; private final static String ROUTE_KEY = "key2"; private final static String EXCHANGE_TYPE = "direct"; public static void main (String[] arsg) throws Exception{ //獲取連線 Connection connection = ConnectionUtil.getConnection(); //建立通道 Channel channel =connection.createChannel(); //宣告交換機 channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);//direct路由模式 //傳送內容 channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,("傳送"+ROUTE_KEY+"訊息").getBytes()); //關閉連線 channel.close(); connection.close(); } }
消費者1
/** * Created by wzy on 2020/12/7 * 訊息接收者1 */ public class RouteRecver1 { private final static String QUEUE = "routequeue1"; private final static String EXCHANGE_NAME = "myRoute"; public static void main (String[] arsg) throws Exception{ //獲取連線 Connection connection = ConnectionUtil.getConnection(); //建立通道 Channel channel =connection.createChannel(); //宣告佇列 channel.queueDeclare(QUEUE,false,false,false,null); channel.basicQos(1);//執行的快就多幹,執行的慢就少幹 //將交換器與佇列繫結 channel.queueBind(QUEUE,EXCHANGE_NAME,"key1"); channel.queueBind(QUEUE,EXCHANGE_NAME,"key3"); //定義接收者 boolean autoAck = false; channel.basicConsume(QUEUE, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("1body: " + new String(body)); //確認,false為確認收到訊息,true為拒絕收到訊息 channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
消費者2
/** * Created by wzy on 2020/12/7 * 訊息接收者2 */ public class RouteRecver2 { private final static String QUEUE = "routequeue2"; private final static String EXCHANGE_NAME = "myRoute"; public static void main (String[] arsg) throws Exception{ //獲取連線 Connection connection = ConnectionUtil.getConnection(); //建立通道 Channel channel =connection.createChannel(); //宣告佇列 channel.queueDeclare(QUEUE,false,false,false,null); channel.basicQos(1);//執行的快就多幹,執行的慢就少幹 //將交換器與佇列繫結 channel.queueBind(QUEUE,EXCHANGE_NAME,"key1"); channel.queueBind(QUEUE,EXCHANGE_NAME,"key2"); //定義接收者 boolean autoAck = false; channel.basicConsume(QUEUE, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("2body: " + new String(body)); //確認,false為確認收到訊息,true為拒絕收到訊息 channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
執行生產者後,消費者1和消費者2都收到訊息
修改路由key為了key2後,重新執行生產者後,只有消費者2收到訊息