1. 程式人生 > >rabbitmq6-路由模式

rabbitmq6-路由模式

一、前言:

大體的應用場景在前面都已經分析過,這裡面只是最基礎的rabbitmq的基礎的教程的使用方法,後面我們在實際使用的過程中是會結合springboot來開展的,所以這裡我們迅速的把基礎過萬,然後迅速進入實戰的部分。但是這裡建議還是把前面的相關的博文看一看,因為springboot封裝了rabbitmq的一些操作,但是你要知道封裝的內容是什麼,不要求完全精細的掌握底層,但是也要知道個大概,這樣有問題起碼你能有一個明確的思路,就是google或者百度你起碼能有一個關鍵詞來搜尋。

二、兩種模式:

image.png
圖上的兩種繫結模式都是合法的:
- 相同的routing key 繫結不同的佇列
- 不同的routing key 繫結同一個佇列
有人可能會說前面我話的圖routing key是在交換機和佇列之間binding key是在消費者和佇列之間,這裡申明一下這個是沒有問題的,因為按照Exchange的type來進行匹配的時候都是在佇列裡面進行匹配的,所以這在圖上是沒有問題的。

三、程式碼:

3.1、生產者:
  • 這裡面我們演示圖二的情況,不同的routing key繫結同一個佇列。事實上進行這個實驗的完整做法是在傳送訊息的時候傳入routing key的,但是這裡為了簡單演示,所以直接傳送兩次訊息。
public class Producer {
    public static final String EXCHANGE_NAME = "routing_model";
    public static final String ROUTING_KEY1 = "routing_key1";
    public static final
String ROUTING_KEY2 = "routing_key2"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnUtils.getConn(); Channel channel = conn.createChannel(); /** * 注意下面我們持久化的是交換機,已經不是持久化的隊列了,交換機是不存放訊息的,只是一個訊息的搬運工 */ String exchangeType = BuiltinExchangeType.DIRECT.getType(); boolean
durable = true; boolean autoDelete = false; boolean internal = false; Map<String,Object> arguments = null; // 宣告一個交換機 channel.exchangeDeclare(EXCHANGE_NAME, exchangeType,durable,autoDelete,internal,arguments); /** * 繫結一個佇列,說明這個物件了對這個交換機上的資料感興趣,下面是繫結的規則: * 1、同一個routing key 可以繫結多個佇列 * \---routing key1 ----Q1 * P --- X * \---routing key2 ----Q2 * 2、不同的 routing key繫結一個佇列 * \----routing key1 -----Q1 * P ----- X * \----routing kye2 ----------| * \ Q2 * \----routing key3 ----------\ * * 下面我們演示第二種, */ /** * 下面是依據我的個人理解寫出來的,如果大家有不同的意見可以留言或者加群交流 * 在路由的模式下,生產者是不需要繫結佇列的,只是宣告交換機,同時交換機是無法存放訊息的, * 假如我們直接宣告交換機,然後傳送訊息,但是消費者還有繫結宣告佇列,那麼這個訊息就有可能丟失,所以必須先建立交換機以及佇列 * 然後生產者傳送訊息,消費者接受 * * 這個正常的做法應該是把 routing key作為方法的引數傳遞過來的,這裡面由於我們是做測試,就直接傳送兩個訊息以不同的routing可以 * 來發送 */ channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY1,null,"CEUIXCXI routing key1".getBytes()); channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY2,null,"CEUIXCXI routing key2".getBytes()); } }

希望大家能多看看我程式碼中的註釋資訊:
- 上面的持久化指的是交換機的持久化,現在我們接觸到的持久化有:訊息的持久化、交換機的持久化。
- 進行上面的實驗,我們要先建立交換機,所以在執行的時候一定要先執行以下上面的程式碼進行建立交換機,但是我們發現一個問題,我們執行上述的兩碼,由於我們沒有執行消費者的關係,我們發現在web控制檯我們檢視不到訊息。是的,我們檢視不到,大家記住交換機的作用,交換機只是訊息的搬運工,既不生產訊息亦不儲存訊息
- 記住上面我們使用的模式是Exchanage的direct交換機模式

3.2、消費者:
public class Consume001 {
    public static final String EXCHANGE_NAME = "routing_model";
    public static final String QUEUE_NAME1 = "routing_queue1";
    public static final String ROUTING_KEY1 = "routing_key1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        final Channel channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME1,true,false,false,null);
       channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,ROUTING_KEY1);
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE_NAME1,false,consumer);
    }
}
public class Consume002 {
    public static final String EXCHANGE_NAME = "routing_model";
    public static final String QUEUE_NAME1 = "routing_queue1";
    public static final String ROUTING_KEY1 = "routing_key2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        final Channel channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME1,true,false,false,null);
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,ROUTING_KEY1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME1,false,consumer);

    }
}

可以看到我們使用不同的routing key綁定了同一個佇列
image.png
image.png

大概就是這樣了,如果大家有疑問歡迎留言和加群討論。