1. 程式人生 > >RabbitMQ(二)

RabbitMQ(二)

hash sets uri dir block false ons 獲取 消息

RabbitMQ的消息優先

RabbitMQ可以設置隊列的優先級,在隊列中的高優先級消息會被優先消費。在設置優先級時,首先需要設置隊列的最高優先級,然後在生產者發送消息時設置該條消息的優先級,最後在隊列中的高優先級的消息會被先發送給消費者消費

設置隊列的最高優先級

設置隊列的最高優先級在聲明隊列時進行設置,代碼如下:

Map<String, Object> queueArgs = new HashMap<>(1);
queueArgs.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false
, false, queueArgs);

設置消息的優先級

設置消息的優先級在生產者生成消息時進行設置,代碼如下:

BasicProperties properties = new BasicProperties.Builder()
    .priority(i)
    .build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message);

註意:當消費者消費速度大於生產端,且Broker中沒有消息堆積的話,也就是說當生產者生產一條消息就被消費者消費,消息隊列中沒有消息堆積的話,設置消息優先級是沒有意義的

例子

生產者

Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
for (int i = 0; i < 10; i++) {
    BasicProperties properties = new BasicProperties.Builder()
        .priority(i)
        .build();
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties,
        String.valueOf
(i).getBytes(StandardCharsets.UTF_8)); }

消費者

Channel channel = connection.createChannel();
channel.exchangeDeclare(PriorityProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String, Object> queueArgs = new HashMap<>(1);
queueArgs.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, queueArgs);
channel.queueBind(QUEUE_NAME, PriorityProducer.EXCHANGE_NAME, PriorityProducer.ROUTING_KEY);
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, StandardCharsets.UTF_8);
        System.out.print(message + " ");
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

輸出如下: 0 9 8 7 6 5 4 3 2 1

由於消費者設置了消息預取數量為1,所以會先取0消費,然後造成消息在消息隊列中的積壓,後面取的話就會先取優先級高的消息

RabbitMQ實現延遲消息

RabbitMQ使用AMQP協議,在AMQP協議中沒有直接實現延遲消息,所以我們使用死信隊列(DLX)和消息存活時間(TTL)模擬出延遲隊列

死信隊列(DLX)

當消息在隊列中變為死信消息(Dead Message)後,該消息會被Publish到該隊列的DLX(Dead-Letter-Exchange)中。DLX就是一個Exchange,當消息被發送到DLX後可以路由到隊列中進行重新消費

消息在消息隊列中變為死信消息的幾種情況:

  • 消息被拒絕並且不會重新進入隊列(requeue=false)
  • 消息TTL過期
  • 消息隊列達到最大長度

在聲明隊列時設置該隊列的死信隊列以及發送消息到死信隊列的Routing Key,代碼如下:

Map<String, Object> queueArgs = new HashMap<>(2);
// 設置死信隊列
queueArgs.put("x-dead-letter-exchange", DelayProducer.DLX_EXCHANGE_NAME);
// 設置死信Roting Key,不設置默認使用該Queue的Routing Key
queueArgs.put("x-dead-letter-routing-key", DelayProducer.DLX_ROUTING_KEY);
channel.queueDeclare(PLAIN_QUEUE_NAME, false, false, false, queueArgs);

實現延遲隊列

可以通過DDL和DLX實現延遲隊列,具體實現邏輯如下:
把消息發送到普通的隊列中(該隊列設置死信隊列),當消息DDL到期後會發送到死信隊列中,然後通過消費死信隊列中的消息實現延遲隊列,示例代碼如下:

生產者

Channel channel = connection.createChannel();
channel.exchangeDeclare(PLAIN_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
BasicProperties properties = new BasicProperties.Builder()
    // 設置消息的TTL
    .expiration("60000")
    .build();
channel.basicPublish(PLAIN_EXCHANGE_NAME, PLAIN_ROUTING_KEY, properties,
    "Hello".getBytes(StandardCharsets.UTF_8));

消費者

Channel channel = connection.createChannel();
channel.exchangeDeclare(DelayProducer.PLAIN_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DelayProducer.DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String, Object> queueArgs = new HashMap<>(2);
// 設置死信隊列
queueArgs.put("x-dead-letter-exchange", DelayProducer.DLX_EXCHANGE_NAME);
// 設置死信Roting Key,不設置默認使用該Queue的Routing Key
queueArgs.put("x-dead-letter-routing-key", DelayProducer.DLX_ROUTING_KEY);
channel.queueDeclare(PLAIN_QUEUE_NAME, false, false, false, queueArgs);
channel.queueBind(PLAIN_QUEUE_NAME, DelayProducer.PLAIN_EXCHANGE_NAME, DelayProducer.PLAIN_ROUTING_KEY);
channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null);
channel.queueBind(DLX_QUEUE_NAME, DelayProducer.DLX_EXCHANGE_NAME, DelayProducer.DLX_ROUTING_KEY);
// 由於消息到普通隊列中TTL時間內沒有消費,所以該消息會被發送到死信隊列中,所以我們通過消費死信隊列來實現延遲消息
channel.basicConsume(DLX_QUEUE_NAME, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, StandardCharsets.UTF_8);
        System.out.println(message);
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

RabbitMQ消費模式

由前面的消息隊列系列文章可以看出來,消費者可以獲取消息有Pull、Push模型。RabbitMQ兩種模型都支持,但是其對Pull模型支持不太好,需要自己實現輪詢查詢是否有消息。下面是兩種模型的簡單使用

Push模型

Push模型是RabbitMQ服務器主動推送消息給Consumer。這種模型有點像設計模式中的時間驅動模式,需要Consumer註冊回調接口到RabbitMQ服務器中,當RabbitMQ服務器有消息時會主動回調接口發送消息。Push模型有慢消費的缺點,RabbitMQ通過設置消費者預取消息數量來控制服務器發送消息的速度。我們經常用到就是這種模式,Consumer示例代碼如下:

Channel channel = connection.createChannel();
channel.exchangeDeclare(PullProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, false,false, false, null);
channel.queueBind(QUEUE_NAME, PullProducer.EXCHANGE_NAME, PullProducer.ROUTING_KEY);
channel.basicConsumer(QUEUE_NAME, false, new DefaultConsumer(channel) {
   
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        // 處理消息邏輯
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

Pull模型

Pull模型Consumer主動去RabbitMQ服務器拉消息。這種模式的缺點是消息延遲和忙等,需要自己設計輪詢方案。Consumer示例代碼如下,沒有實現輪詢方案:

Connection connection = Basic.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(PullProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, false,false, false, null);
channel.queueBind(QUEUE_NAME, PullProducer.EXCHANGE_NAME, PullProducer.ROUTING_KEY);
while (true) {
    GetResponse response = channel.basicGet(QUEUE_NAME, false);
    if (response == null) {
        continue;
    }
    String message = new String(response.getBody(), StandardCharsets.UTF_8);
    // 處理消息邏輯
    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}

Reference

http://blog.csdn.net/u013256816/article/details/55105495
http://blog.csdn.net/u013256816/article/details/62890189

RabbitMQ(二)