1. 程式人生 > >RocketMq-延遲訊息及 程式碼實現

RocketMq-延遲訊息及 程式碼實現

支援延遲訊息

RocketMQ 支援定時訊息,但是不支援任意時間精度,僅支援特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。

配置

開啟安裝目錄的./conf/broker.conf  檔案,並新增如下延遲級別的時長設定:

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

配置檔案內容:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

延遲配置說明:

  1. 配置項配置了從1級開始,各級延時的時間,可以修改這個指定級別的延時時間;
  2. 時間單位支援:s、m、h、d,分別表示秒、分、時、天;
  3. 預設值就是上面宣告的,可手工調整;
  4. 預設值已夠用,不建議修改這個值。

如何傳送和接收延遲訊息?

首先建一個傳送訊息的服務類  RocketMQProvider

package com.ms.demo.demo.service;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;

import java.util.Date;
import java.util.List;

/**
 * @ClassName RocketMQProvider
 * @Description <p>TODO</p>
 * @Author Jakemanse
 * @Date 2018/10/24 15:22
 * @Version 1.0
 **/
@Service
public class RocketMQProvider {
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String produerGroup;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;


    /**
     * @Desc <p> 延遲訊息傳送</p>
     * @Author Jakemanse
     * @Date 2018/10/24  15:59
     * @Param
     * @Return void
     */
    public void delayMQProducer() {

        DefaultMQProducer producer = new DefaultMQProducer(produerGroup);

        producer.setNamesrvAddr(namesrvAddr);
        try {
            producer.start();

            Message message = new Message("TopicTest", "push", "傳送延遲訊息---".getBytes());
            message.setDelayTimeLevel(3);

            StopWatch stopWatch = new StopWatch();
            stopWatch.start();

            for (int i = 0; i < 1; i++) {
                SendResult result = producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        int index = id % list.size();
                        return list.get(index);
                    }
                }, 1,3000);
            }
            stopWatch.stop();
            System.out.println(new Date().toString() + " 傳送1條延遲訊息耗時:" + stopWatch.getTotalTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }




}

注意最重的一個設定   message.setDelayTimeLevel(3);  

然後我們再建立一個訊息消費端服務類  RocketConsumer

package com.ms.demo.demo.service;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * @ClassName RocketConsumer
 * @Description <p>TODO</p>
 * @Author Jakemanse
 * @Date 2018/10/24 15:16
 * @Version 1.0
 **/
@Service
public class RocketConsumer {

    @Value("${apache.rocketmq.producer.consumerGroup}")
    private String consumerGroup;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    public void defaultMQPushConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            consumer.subscribe("TopicTest", "push");

            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list,context)->{
                try {
                    for (MessageExt messageExt : list) {
//                        System.out.println("messageExt:" + messageExt);
                        String messageBody = new String(messageExt.getBody());
                        System.out.println( new Date().toString() + "  消費響應:"+ messageExt.getMsgId() +", MsgBody:" + messageBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });

            consumer.start();

        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

這裡注意,一定不要忘記寫  cosumer.start(); 讓監聽和接收訊息服務啟動。

訊息配置properties 內容:


# 消費者的組名
apache.rocketmq.producer.consumerGroup=PushConsumer
# 生產者的組名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=127.0.0.1:9876

參考文章: