1. 程式人生 > >RabbitMQ高階特性-消費端ACK與重回佇列

RabbitMQ高階特性-消費端ACK與重回佇列

消費端ACK與重回佇列

消費端ACK

  • 消費端的手工ACK和NACK, ACK是確認成功消費, NACK表示訊息處理失敗, 會重發訊息
  • 消費端進行消費的時候, 如果由於業務異常我們可以進行日誌的記錄, 然後進行補償
  • 如果由於伺服器宕機等嚴重問題, 就需要手工進行ACK保障消費端消費成功

重回佇列

  • 消費端重回佇列是為了對沒有處理成功的訊息, 把訊息重新回遞給Broker
  • 一般在實際應用中, 都會關閉重回佇列, 也就是設定為False

程式碼演示

自定義消費者

package com.qiyexue.api.ack;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * 自定義消費者 * 設定手動ack和重回佇列 * @author 七夜雪 * @date 2018-12-16 8:20 */ public class
MyConsumer extends DefaultConsumer { private Channel channel; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { System.out.println("-------------自定義消費者------------"); System.out.println("consumerTag : " + consumerTag); System.out.println("envelope : " + envelope); System.out.println("properties : " + properties); System.out.println("body : " + new String(body)); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } if ((Integer)properties.getHeaders().get("num") == 0) { // 三個引數 : DeliveryTag, 是否批量拒絕, 是否可以重回佇列 channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }

Producer

package com.qiyexue.api.ack;

import com.rabbitmq.client.*;

import java.util.HashMap;
import java.util.Map;

/**
 * 生產者
 *
 * @author 七夜雪
 * @date 2018-12-15 19:56
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        // 1. 建立ConnectionFactory, 並設定屬性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 建立連線
        Connection connection = factory.newConnection();

        // 3. 建立channel
        Channel channel = connection.createChannel();


        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.qiye";

        for (int i = 0; i < 5; i++) {
            // 傳送訊息
            String msg = "Hello, 七夜雪 " + i;
            Map<String, Object> hearder = new HashMap<String, Object>();
            hearder.put("num", i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .contentEncoding("UTF-8")
                    .headers(hearder).build();
            channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
        }


        // 關閉連線
        channel.close();
        connection.close();

    }

}

Consumer

package com.qiyexue.api.ack;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消費者,關閉自動ack
 *
 * @author 七夜雪
 * @date 2018-12-15 20:07
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1. 建立連線工廠並設定屬性
        ConnectionFactory factory = new ConnectionFactory();;
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 建立連線
        Connection connection = factory.newConnection();

        // 3. 建立channel
        Channel channel = connection.createChannel();

        // 4. 宣告Exchange
        String exchangeName = "test_ack_exchange";
        String exchangeType = "topic";
        String routingKey = "ack.*";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);

        // 5. 宣告訊息佇列
        String queueName = "test_ack_queue";
        channel.queueDeclare(queueName, true, false, false, null);

        // 6. 繫結佇列和Exchange
        channel.queueBind(queueName, exchangeName, routingKey);

        // 7. 設定消費者為自定義的消費者, 將autoAck設定為false
        channel.basicConsume(queueName, false, new MyConsumer(channel));

    }

}