1. 程式人生 > 實用技巧 >MQ實現任意精度延遲訊息

MQ實現任意精度延遲訊息

目錄

實現思路

  • 以RocketMQ為例
  • 大部分MQ都支援固定延時佇列,因為固定延時佇列只需要建立少量的佇列,不需要對訊息進行排序,所以實現非常簡單
  • 基於固定延時訊息和時間輪設計,我們可以想到把固定延時佇列作為佇列時間輪,比如MQ支援的固定延時佇列包括1s,2s,4s,8s,16s,32s,此時我們需要傳送一個27s的延時訊息。可以先把訊息傳送到16s延時的佇列中,16s後消費到這條訊息,但是還有11s才應該消費,所以我們把它重新發回MQ,並且投遞到8s的延時佇列。消費時發現還要3s才應該消費,然後把它又丟回2s延時的佇列;然後消費發現還有1s,那麼繼續丟回1s延時的佇列,此時再消費,發現27s到了,把訊息交給實際的消費者即可。

實現步驟

  • 啟動nameserver
  • 修改broker的配置,新增
messageDelayLevel=1s 2s 4s 8s 16s 32s 64s 128s 256s 512s 1024s 2048s 4096s 8192s 16384s 32768s 65536s 131072s
  • 啟動broker
  • 封裝生產者

任意延時生產者

package com.zby;

import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
 * 任意延時生產者
 * @author zby
 *
 */
public class ArbitrarilyDelayProducer{

	private MQProducer mqProducer;

	public ArbitrarilyDelayProducer(String namesrvAddr,String producerGroup) throws MQClientException {
		DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
		defaultMQProducer.setNamesrvAddr(namesrvAddr);
		this.mqProducer=defaultMQProducer;
	}
	public void start() throws MQClientException {
		mqProducer.start();
	}
	
	public SendResult send(final Message msg,long delay,TimeUnit timeUnit)
			throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		ArbitrarilyDelayUtil.setMsgDelay(msg, delay, timeUnit);
		return mqProducer.send(msg);
	}

	public void shutdown() {
		mqProducer.shutdown();
	}
	
}

工具類

package com.zby;

import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.common.message.Message;

/**
 * 任意延時工具類
 * 
 * @author zby
 *
 */
public class ArbitrarilyDelayUtil {

	public static final String REMAIN_DELAY_KEY = "remainDelay";

	/**
	 * 延時時間
	 */
	private static final long[] delayArray = new long[] { 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192,
			16384, 32768, 65536, 131072 };

	private ArbitrarilyDelayUtil() {
	}

	public static void setMsgDelay(Message msg, long delay, TimeUnit timeUnit) {
		int delayTimeLevel = getLatestDelayLevelFromDelayTime(timeUnit.toSeconds(delay));
		msg.setDelayTimeLevel(delayTimeLevel);
		msg.putUserProperty(REMAIN_DELAY_KEY, Long.toString(delay - getDelayTimeFromDelayLevel(delayTimeLevel)));
	}

	/**
	 * 根據延時等級獲取延時時間
	 * 
	 * @param delayLevel 延時等級
	 * @return 延時時間
	 */
	public static long getDelayTimeFromDelayLevel(int delayLevel) {
		validDelayLevel(delayLevel);
		if (delayLevel == 0) {
			return 0;
		}
		return delayArray[delayLevel - 1];

	}

	/**
	 * 根據延時時間獲取最近的延時等級
	 * 
	 * @param delayTime
	 * @return
	 */
	public static int getLatestDelayLevelFromDelayTime(long delayTime) {
		validDelayTime(delayTime);
		if (delayTime == 0) {
			return 0;
		}
		for (int i = 0; i < delayArray.length - 1; i++) {
			if (delayTime >= delayArray[i] && delayTime < delayArray[i + 1]) {
				return i + 1;
			}
		}
		return delayArray.length;
	}

	/**
	 * 校驗延時時間
	 */
	private static void validDelayTime(long delayTime) {
		if (delayTime < 0) {
			throw new IllegalArgumentException("Not supported delay time:" + delayTime);
		}
	}

	/**
	 * 校驗延時等級
	 */
	private static void validDelayLevel(int delayLevel) {
		if (delayLevel < 0 || delayLevel > delayArray.length) {
			throw new IllegalArgumentException("Not supported delay level:" + delayLevel);
		}
	}
}

常量類

package com.zby;

public class ArbitrarityDelayConstants {

	public static final String NAME_SERVER_ADDRESS="localhost:9876";
	public static final String ARBITRARITY_DELAY_TOPIC="arbitrarity_delay_topic";
	public static final String ARBITRARITY_DELAY_PRODUCER_GROUP="arbitrarity_delay_producer_group";
	public static final String ARBITRARITY_DELAY_CONSUMER_GROUP="arbitrarity_delay_consumer_group";
}

生產者示例

package com.zby;

import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ArbitrarilyDelayProducerDemo {

	public static void main(String[] args) throws Exception {
		ArbitrarilyDelayProducer arbitrarilyDelayProducer = new ArbitrarilyDelayProducer(
				ArbitrarityDelayConstants.NAME_SERVER_ADDRESS,
				ArbitrarityDelayConstants.ARBITRARITY_DELAY_PRODUCER_GROUP);
		arbitrarilyDelayProducer.start();
		Message msg = new Message(ArbitrarityDelayConstants.ARBITRARITY_DELAY_TOPIC,
				("訊息傳送時間 :" + new Date()).getBytes(StandardCharsets.UTF_8));
		SendResult sendResult = arbitrarilyDelayProducer.send(msg, 27, TimeUnit.SECONDS);
		System.out.println("訊息傳送結果:" + sendResult.getSendStatus());
		arbitrarilyDelayProducer.shutdown();
	}
}

封裝消費者監聽器

package com.zby;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class ArbitrarilyDelayMessageListenerConcurrently implements MessageListenerConcurrently {

	private ArbitrarilyDelayProducer arbitrarilyDelayProducer;

	private MessageListenerConcurrently proxyListener;

	public ArbitrarilyDelayMessageListenerConcurrently(ArbitrarilyDelayProducer arbitrarilyDelayProducer,
			MessageListenerConcurrently proxyListener) {
		this.arbitrarilyDelayProducer = arbitrarilyDelayProducer;
		this.proxyListener = proxyListener;
	}

	@Override
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
		MessageExt messageExt = msgs.get(0);
		String remainDelay = messageExt.getUserProperty(ArbitrarilyDelayUtil.REMAIN_DELAY_KEY);
		if (remainDelay != null && Long.parseLong(remainDelay) > 0) {
			try {
				arbitrarilyDelayProducer.send(messageExt, Long.parseLong(remainDelay), TimeUnit.SECONDS);
			} catch (Exception e) {
				e.printStackTrace();
			}
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
		return proxyListener.consumeMessage(msgs, context);
	}

}

消費者示例

package com.zby;

import java.nio.charset.StandardCharsets;
import java.util.Date;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class ArbitrarilyDelayConsumerDemo {

	public static void main(String[] args) throws MQClientException {

		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
				ArbitrarityDelayConstants.ARBITRARITY_DELAY_CONSUMER_GROUP);
		consumer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.subscribe(ArbitrarityDelayConstants.ARBITRARITY_DELAY_TOPIC, "*");

		ArbitrarilyDelayProducer arbitrarilyDelayProducer = new ArbitrarilyDelayProducer(
				ArbitrarityDelayConstants.NAME_SERVER_ADDRESS,
				ArbitrarityDelayConstants.ARBITRARITY_DELAY_PRODUCER_GROUP);
		arbitrarilyDelayProducer.start();

		ArbitrarilyDelayMessageListenerConcurrently arbitrarilyDelayMessageListenerConcurrently = new ArbitrarilyDelayMessageListenerConcurrently(
				arbitrarilyDelayProducer, (msgs, context) -> {
					System.out.printf("消費訊息了,當前時間 %s:%s Receive New Messages: %s %n", new Date(),
							Thread.currentThread().getName(),
							new String(msgs.get(0).getBody(), StandardCharsets.UTF_8));
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				});
		consumer.registerMessageListener(arbitrarilyDelayMessageListenerConcurrently);

		consumer.start();
		System.out.printf("Consumer Started.%n");
	}
}

消費者控制檯結果

15:01:38.934 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
Consumer Started.
消費訊息了,當前時間 Mon Jul 20 15:02:23 CST 2020:ConsumeMessageThread_4 Receive New Messages: 訊息傳送時間 :Mon Jul 20 15:01:56 CST 2020 

優點

  • 實現簡單
  • 任意精度

缺點

  • 一條訊息多次消費,浪費資源
  • 需要封裝客戶端

改進

  • 在broker上實現時間輪降級