1. 程式人生 > 其它 >RocketMQ 延遲訊息

RocketMQ 延遲訊息

技術標籤:rocketMQ延遲訊息佇列java中介軟體java佇列

延遲訊息

生產者把訊息傳送到訊息佇列中以後,並不期望被立即消費,而是等待指定時間後才可以被消費者消費,這類訊息通常被稱為延遲訊息。

在RocketMQ中,支援延遲訊息,但是不支援任意時間精度的延遲訊息,只支援特定級別的延遲訊息。
訊息延遲級別分別為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級別。

本文使用 RocketMQ 延遲訊息處理訂單超時場景

傳送延遲訊息(生產者)

/**
     * 推送延遲訊息
     * @param topic 
     * @param body 
     * @param producerGroup 
     * @return boolean
     */
    public boolean sendMessage(String topic, String body, String producerGroup)
    {
        try
        {
            Message recordMsg = new Message(topic, body.getBytes());
            producer.setProducerGroup(producerGroup);

            //設定訊息延遲級別,我這裡設定5,對應就是延時一分鐘
            // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
            recordMsg.setDelayTimeLevel(5);
            // 傳送訊息到一個Broker
            SendResult sendResult = producer.send(recordMsg);
            // 通過sendResult返回訊息是否成功送達
            log.info("傳送延遲訊息結果:======sendResult:{}", sendResult);
            DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            log.info("傳送時間:{}", format.format(new Date()));

            return true;
        }
        catch (Exception e)
        {
            e.printStackTrace();
            log.error("延遲訊息佇列推送訊息異常:{},推送內容:{}", e.getMessage(), body);
        }
        return false;
    }

傳送請求結果:
在這裡插入圖片描述

消費延遲訊息(消費者)

/**
     * 接收延遲訊息
     * 
     * @param topic
     * @param consumerGroup
     * @param messageHandler
     */
    public void messageListener(String topic, String consumerGroup, MessageListenerConcurrently messageHandler)
    {
        ThreadPoolUtil.execute(() ->
        {
            try
            {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
                consumer.setConsumerGroup(consumerGroup);
                consumer.setVipChannelEnabled(false);
                consumer.setNamesrvAddr(address);
                //設定消費者拉取訊息的策略,*表示消費該topic下的所有訊息,也可以指定tag進行訊息過濾
                consumer.subscribe(topic, "*");
                //消費者端啟動訊息監聽,一旦生產者傳送訊息被監聽到,就列印訊息,和rabbitmq中的handlerDelivery類似
                consumer.registerMessageListener(messageHandler);
                consumer.start();
                log.info("啟動延遲訊息佇列監聽成功:" + topic);
            }
            catch (MQClientException e)
            {
                log.error("啟動延遲訊息佇列監聽失敗:{}", e.getErrorMessage());
                System.exit(1);
            }
        });
    }

消費結果:在一分鐘之後被消費
在這裡插入圖片描述

實現監聽類,處理具體邏輯

/**
 * 延遲訊息監聽
 * 
 * @author hucj
 */
@Component
public class CourseOrderTimeoutListener implements ApplicationListener<ApplicationReadyEvent>
{

    @Resource
    private MQUtil mqUtil;

    @Resource
    private CourseOrderTimeoutHandler courseOrderTimeoutHandler;

    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent)
    {
        // 訂單超時監聽
        mqUtil.messageListener(EnumTopic.ORDER_TIMEOUT, EnumGroup.ORDER_TIMEOUT_GROUP, courseOrderTimeoutHandler);
    }
}



/**
 *  實現監聽
 *
 * @author hucj
 */
@Slf4j
@Component
public class CourseOrderTimeoutHandler implements MessageListenerConcurrently
{

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msg : list)
        {
            // 得到訊息體
            String body = new String(msg.getBody());
            JSONObject userJson = JSONObject.parseObject(body);
            TCourseBuy courseBuyDetails = JSON.toJavaObject(userJson, TCourseBuy.class);

            // 處理具體的業務邏輯,,,,,

			DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     	    log.info("消費時間:{}", format.format(new Date()));
           
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

不足或疑問請留言