1. 程式人生 > >RabbitMQ實戰教程(五)-路由模式

RabbitMQ實戰教程(五)-路由模式

1. 路由模式

跟訂閱模式類似,只不過在訂閱模式的基礎上加上路由,訂閱模式是分發到所有繫結到該交換機的佇列,路由模式只分發到繫結在該交換機上面指定的路由鍵佇列.

2. Direct 直接交換機

直連交換機(Direct Exchange)是一種帶路由功能的交換機,它將訊息中的Routing Key與該交換機關聯的所有Binding中的Routing Key進行比較,如果完全相等將訊息傳送到Binding對應的佇列中 .

適用場景 : 根據任務的優先順序把訊息傳送到對應的佇列中,分配更多資源處理優先順序高的佇列.

生產者宣告一個direct型別的交換機,然後傳送訊息到這個交換機指定路由鍵. 消費者指定消費交換機的路由鍵,即可以接到到訊息,其他消費者接收不到 .

Fanout交換機中

生產者 :

// 第二個引數就是路由鍵
channel.basicPublish(EXCHANE_NAME,"",null,message.getBytes());

消費者 :

// 第三個引數就是路由鍵
channel.queueBind(QUEUE,EXCAHNGE_NAME,"");

3. 程式碼演示

同樣的,只是交換機型別改為driect,加了個路由鍵而已 .

這裡演示3個,即表示一個日誌事件,根據日誌型別進行處理

3.1 工具類

package com.makesailing.neo.utils;

import com.rabbitmq.client.
Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * # RabbitMQ連線工具類 * * @Author: jamie.li * @Date: Created in 2018/9/16 14:38 */ public class ConnectionUtils { public static final String host = "127.0.0.1"; public
static final Integer port = 5672; public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); // 如果有 使用者名稱 密碼 vhost 配置即可 connectionFactory.setUsername("jamie"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/simple"); return connectionFactory.newConnection(); } }

3.2 日誌生產者

package com.makesailing.neo.provider;

import com.makesailing.neo.utils.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * # 日誌訊息 提供者
 *
 * @Author: jamie.li
 * @Date: Created in  2018/9/16 22:41
 */
public class LogSend {

  private static final String EXCHANGE_NAME = "test_exchange_direct";

  private static final String INTO_ROUTING_NAME= "info";
  private static final String WARN_ROUTING_NAME= "warn";
  private static final String ERROR_ROUTING_NAME= "error";


  public static void main(String[] args) throws IOException, TimeoutException {
    // 建立連線
    Connection connection = ConnectionUtils.getConnection();
    // 獲取通道
    Channel channel = connection.createChannel();
    // 宣告一個direct 路由交換機
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

    // 傳送info路由鍵訊息
    String infoMessage = "Hello RabbitMQ Info Log";
    channel.basicPublish(EXCHANGE_NAME, INTO_ROUTING_NAME, null, infoMessage.getBytes());
    System.out.println("  LogSend routing info message : '" + infoMessage + "'");
    // 傳送warn路由鍵訊息
    String warnMessage = "Hello RabbitMQ Warn Log";
    channel.basicPublish(EXCHANGE_NAME, WARN_ROUTING_NAME, null, warnMessage.getBytes());
    System.out.println("  LogSend routing warn message : '" + warnMessage + "'");
    // 傳送info路由鍵訊息
    String errorMessage = "Hello RabbitMQ Error Log";
    channel.basicPublish(EXCHANGE_NAME, ERROR_ROUTING_NAME, null, errorMessage.getBytes());
    System.out.println("  LogSend routing error message : '" + errorMessage + "'");

    channel.close();
    connection.close();
  }
}

3.3 error日誌消費者

package com.makesailing.neo.consumer;

import com.makesailing.neo.utils.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * # 錯誤日誌 消費者
 *
 * @Author: jamie.li
 * @Date: Created in  2018/9/16 23:09
 */
public class ErrorReceive {
  private static final String EXCHANGE_NAME = "test_exchange_direct";
  // info日誌佇列
  private static final String QUEUE_NAME = "test_queue_routing_error";

  private static final String ERROR_ROUTING_NAME= "error";

  public static void main(String[] args) throws IOException, TimeoutException {
    // 獲取連線
    Connection connection = ConnectionUtils.getConnection();

    // 開啟通道
    Channel channel = connection.createChannel();

    // 申明要消費的佇列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    // 繫結佇列到交換機
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ERROR_ROUTING_NAME);

    // 這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
    channel.basicQos(1);

    // 建立一個回撥的消費者處理類
    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 接收到的訊息
        String message = new String(body);
        System.out.println(" ErrorReceive '" + message + "' , 任務處理中");

        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          System.out.println(" ErrorReceive done ");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };

    // 消費訊息
    channel.basicConsume(QUEUE_NAME, false, consumer);

  }
}

3.4 info日誌消費者

package com.makesailing.neo.consumer;

import com.makesailing.neo.utils.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * # info 日誌消費者
 *
 * @Author: jamie.li
 * @Date: Created in  2018/9/16 23:02
 */
public class InfoReceive {

  private static final String EXCHANGE_NAME = "test_exchange_direct";
  // info日誌佇列
  private static final String QUEUE_NAME = "test_queue_routing_info";

  private static final String INTO_ROUTING_NAME= "info";

  public static void main(String[] args) throws IOException, TimeoutException {
    // 獲取連線
    Connection connection = ConnectionUtils.getConnection();

    // 開啟通道
    Channel channel = connection.createChannel();

    // 申明要消費的佇列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    // 繫結佇列到交換機
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, INTO_ROUTING_NAME);

    // 這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
    channel.basicQos(1);

    // 建立一個回撥的消費者處理類
    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 接收到的訊息
        String message = new String(body);
        System.out.println(" InfoReceive '" + message + "' , 任務處理中");

        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          System.out.println(" InfoReceive done ");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };

    // 消費訊息
    channel.basicConsume(QUEUE_NAME, false, consumer);

  }
}

3.5 warn日誌消費者

package com.makesailing.neo.consumer;

import com.makesailing.neo.utils.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * # 警告日誌 消費者
 *
 * @Author: jamie.li
 * @Date: Created in  2018/9/16 23:09
 */
public class WarnReceive {
  private static final String EXCHANGE_NAME = "test_exchange_direct";
  // info日誌佇列
  private static final String QUEUE_NAME = "test_queue_routing_warn";

  private static final String INTO_ROUTING_NAME= "info";
  private static final String WARN_ROUTING_NAME= "warn";
  private static final String ERROR_ROUTING_NAME= "error";


  public static void main(String[] args) throws IOException, TimeoutException {
    // 獲取連線
    Connection connection = ConnectionUtils.getConnection();

    // 開啟通道
    Channel channel = connection.createChannel();

    // 申明要消費的佇列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    // 繫結佇列到交換機
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, WARN_ROUTING_NAME);
    //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, INTO_ROUTING_NAME);
    //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ERROR_ROUTING_NAME);

    // 這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
    channel.basicQos(1);

    // 建立一個回撥的消費者處理類
    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 接收到的訊息
        String message = new String(body);
        System.out.println("WarnReceive '" + message + "' , 任務處理中");

        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          System.out.println(" WarnReceive done ");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };

    // 消費訊息
    channel.basicConsume(QUEUE_NAME, false, consumer);

  }
}

4. 測試

提前在RabbitMQ Management建立一個direct交換機,或者先執行一次生產者(執行時會判斷交換機是否存在,不存在則建立交換機),這樣保證交換機存在,不然直接啟動消費者會提示交換機不存在。

注意點

如果在沒有佇列繫結在交換機上面時,往交換機發送訊息會丟失,之後繫結在交換機上面的佇列接收不到之前的訊息,也就是先執行第一次傳送,建立了交換機,但是還沒有佇列繫結在交換機上面,如果這次傳送的訊息就會丟失。

然後再啟動3上消費者,最後在啟動生產者.

執行結果 :

5. 多繫結情況

5.1 同一消費者繫結佇列多個路由鍵

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");

在`warn消費者中佇列繫結多個路由鍵 :

package com.makesailing.neo.consumer;

import com.makesailing.neo.utils.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * # 警告日誌 消費者
 *
 * @Author: jamie.li
 * @Date: Created in  2018/9/16 23:09
 */
public class WarnReceive {
  private static final String EXCHANGE_NAME = "test_exchange_direct";
  // info日誌佇列
  private static final String QUEUE_NAME = "test_queue_routing_warn";

  private static final String INTO_ROUTING_NAME= "info";
  private static final String WARN_ROUTING_NAME= "warn";
  private static final String ERROR_ROUTING_NAME= "error";


  public static void main(String[] args) throws IOException, TimeoutException {
    // 獲取連線
    Connection connection = ConnectionUtils.getConnection();

    // 開啟通道
    Channel channel = connection.createChannel();

    // 申明要消費的佇列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    // 繫結佇列到交換機
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, WARN_ROUTING_NAME);
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, INTO_ROUTING_NAME);
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ERROR_ROUTING_NAME);

    // 這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
    channel.basicQos(1);

    // 建立一個回撥的消費者處理類
    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 接收到的訊息
        String message = new String(body);
        System.out.println("WarnReceive '" + message + "' , 任務處理中");

        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          System.out.println(" WarnReceive done ");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };

    // 消費訊息
    channel.basicConsume(QUEUE_NAME, false, consumer);

  }
}

執行結果:

如果一個消費者綁定了這3個路由鍵,那麼當生產者傳送其中一個路由鍵訊息時,該消費者都能接收到訊息.

5.2 多個消費者繫結相同的路由鍵

即消費者1繫結info,消費者2繫結 info、error .

那麼生產者傳送info路由鍵訊息時,消費者1 、2都能接收到訊息,傳送error路由鍵訊息時,只有消費者2能接收到訊息.