1. 程式人生 > >rocketmq延時消息

rocketmq延時消息

string 解決 print log void ride reg .sh art

rocketmq提供一種延時消息的解決方案,就是在特定的時間到了,消息才會被投遞出去供consumer消費。

總體來是簡單的場景是滿足了,但是需要註意的是延時的時間是需要按照默認配置的延時級別去配置的,而不是隨意設置消息的延時時間。

如果想不受延時級別的約束 可以參考之前的一遍文章http://blog.seoui.com/2017/08/19/delayqueue/

默認的延遲級別

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

這個配置下標從1開始 比如級別2是延時5秒、級別5是延時1分鐘。默認配置在不滿足需求的情況下,可以在broker

配置文件加入messageDelayLevel參數覆蓋默認的延時級別配置。

示例

和普通的消息不同之處在於Producer在發送消息的時候 需要設置message.setDelayTimeLevel();延遲級別方法。其他參數和消費端的寫法並與不同之處。

Producer
public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = 
        new DefaultMQProducer("ExampleProducerGroup");
        producer.start();
        
int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); //延時的級別為3 對應的時間為10s 就是發送後延時10S在把消息投遞出去 message.setDelayTimeLevel(3); producer.send(message); } producer.shutdown(); } }
Consumer
public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = 
        new DefaultMQPushConsumer("ExampleConsumer");
        consumer.subscribe("TestTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
         ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    System.out.println("ok!");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

技術分享圖片

rocketmq延時消息