MQ實現任意精度延遲訊息
阿新 • • 發佈:2020-07-20
目錄
實現思路
- 以RocketMQ為例
- 大部分MQ都支援固定延時佇列,因為固定延時佇列只需要建立少量的佇列,不需要對訊息進行排序,所以實現非常簡單
- 基於固定延時訊息和時間輪設計,我們可以想到把固定延時佇列作為佇列時間輪,比如MQ支援的固定延時佇列包括1s,2s,4s,8s,16s,32s,此時我們需要傳送一個27s的延時訊息。可以先把訊息傳送到16s延時的佇列中,16s後消費到這條訊息,但是還有11s才應該消費,所以我們把它重新發回MQ,並且投遞到8s的延時佇列。消費時發現還要3s才應該消費,然後把它又丟回2s延時的佇列;然後消費發現還有1s,那麼繼續丟回1s延時的佇列,此時再消費,發現27s到了,把訊息交給實際的消費者即可。
實現步驟
- 啟動nameserver
- 修改broker的配置,新增
messageDelayLevel=1s 2s 4s 8s 16s 32s 64s 128s 256s 512s 1024s 2048s 4096s 8192s 16384s 32768s 65536s 131072s
- 啟動broker
- 封裝生產者
任意延時生產者
package com.zby; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; /** * 任意延時生產者 * @author zby * */ public class ArbitrarilyDelayProducer{ private MQProducer mqProducer; public ArbitrarilyDelayProducer(String namesrvAddr,String producerGroup) throws MQClientException { DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup); defaultMQProducer.setNamesrvAddr(namesrvAddr); this.mqProducer=defaultMQProducer; } public void start() throws MQClientException { mqProducer.start(); } public SendResult send(final Message msg,long delay,TimeUnit timeUnit) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { ArbitrarilyDelayUtil.setMsgDelay(msg, delay, timeUnit); return mqProducer.send(msg); } public void shutdown() { mqProducer.shutdown(); } }
工具類
package com.zby; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.message.Message; /** * 任意延時工具類 * * @author zby * */ public class ArbitrarilyDelayUtil { public static final String REMAIN_DELAY_KEY = "remainDelay"; /** * 延時時間 */ private static final long[] delayArray = new long[] { 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072 }; private ArbitrarilyDelayUtil() { } public static void setMsgDelay(Message msg, long delay, TimeUnit timeUnit) { int delayTimeLevel = getLatestDelayLevelFromDelayTime(timeUnit.toSeconds(delay)); msg.setDelayTimeLevel(delayTimeLevel); msg.putUserProperty(REMAIN_DELAY_KEY, Long.toString(delay - getDelayTimeFromDelayLevel(delayTimeLevel))); } /** * 根據延時等級獲取延時時間 * * @param delayLevel 延時等級 * @return 延時時間 */ public static long getDelayTimeFromDelayLevel(int delayLevel) { validDelayLevel(delayLevel); if (delayLevel == 0) { return 0; } return delayArray[delayLevel - 1]; } /** * 根據延時時間獲取最近的延時等級 * * @param delayTime * @return */ public static int getLatestDelayLevelFromDelayTime(long delayTime) { validDelayTime(delayTime); if (delayTime == 0) { return 0; } for (int i = 0; i < delayArray.length - 1; i++) { if (delayTime >= delayArray[i] && delayTime < delayArray[i + 1]) { return i + 1; } } return delayArray.length; } /** * 校驗延時時間 */ private static void validDelayTime(long delayTime) { if (delayTime < 0) { throw new IllegalArgumentException("Not supported delay time:" + delayTime); } } /** * 校驗延時等級 */ private static void validDelayLevel(int delayLevel) { if (delayLevel < 0 || delayLevel > delayArray.length) { throw new IllegalArgumentException("Not supported delay level:" + delayLevel); } } }
常量類
package com.zby;
public class ArbitrarityDelayConstants {
public static final String NAME_SERVER_ADDRESS="localhost:9876";
public static final String ARBITRARITY_DELAY_TOPIC="arbitrarity_delay_topic";
public static final String ARBITRARITY_DELAY_PRODUCER_GROUP="arbitrarity_delay_producer_group";
public static final String ARBITRARITY_DELAY_CONSUMER_GROUP="arbitrarity_delay_consumer_group";
}
生產者示例
package com.zby;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ArbitrarilyDelayProducerDemo {
public static void main(String[] args) throws Exception {
ArbitrarilyDelayProducer arbitrarilyDelayProducer = new ArbitrarilyDelayProducer(
ArbitrarityDelayConstants.NAME_SERVER_ADDRESS,
ArbitrarityDelayConstants.ARBITRARITY_DELAY_PRODUCER_GROUP);
arbitrarilyDelayProducer.start();
Message msg = new Message(ArbitrarityDelayConstants.ARBITRARITY_DELAY_TOPIC,
("訊息傳送時間 :" + new Date()).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = arbitrarilyDelayProducer.send(msg, 27, TimeUnit.SECONDS);
System.out.println("訊息傳送結果:" + sendResult.getSendStatus());
arbitrarilyDelayProducer.shutdown();
}
}
封裝消費者監聽器
package com.zby;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class ArbitrarilyDelayMessageListenerConcurrently implements MessageListenerConcurrently {
private ArbitrarilyDelayProducer arbitrarilyDelayProducer;
private MessageListenerConcurrently proxyListener;
public ArbitrarilyDelayMessageListenerConcurrently(ArbitrarilyDelayProducer arbitrarilyDelayProducer,
MessageListenerConcurrently proxyListener) {
this.arbitrarilyDelayProducer = arbitrarilyDelayProducer;
this.proxyListener = proxyListener;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
String remainDelay = messageExt.getUserProperty(ArbitrarilyDelayUtil.REMAIN_DELAY_KEY);
if (remainDelay != null && Long.parseLong(remainDelay) > 0) {
try {
arbitrarilyDelayProducer.send(messageExt, Long.parseLong(remainDelay), TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return proxyListener.consumeMessage(msgs, context);
}
}
消費者示例
package com.zby;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
public class ArbitrarilyDelayConsumerDemo {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
ArbitrarityDelayConstants.ARBITRARITY_DELAY_CONSUMER_GROUP);
consumer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(ArbitrarityDelayConstants.ARBITRARITY_DELAY_TOPIC, "*");
ArbitrarilyDelayProducer arbitrarilyDelayProducer = new ArbitrarilyDelayProducer(
ArbitrarityDelayConstants.NAME_SERVER_ADDRESS,
ArbitrarityDelayConstants.ARBITRARITY_DELAY_PRODUCER_GROUP);
arbitrarilyDelayProducer.start();
ArbitrarilyDelayMessageListenerConcurrently arbitrarilyDelayMessageListenerConcurrently = new ArbitrarilyDelayMessageListenerConcurrently(
arbitrarilyDelayProducer, (msgs, context) -> {
System.out.printf("消費訊息了,當前時間 %s:%s Receive New Messages: %s %n", new Date(),
Thread.currentThread().getName(),
new String(msgs.get(0).getBody(), StandardCharsets.UTF_8));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.registerMessageListener(arbitrarilyDelayMessageListenerConcurrently);
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消費者控制檯結果
15:01:38.934 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
Consumer Started.
消費訊息了,當前時間 Mon Jul 20 15:02:23 CST 2020:ConsumeMessageThread_4 Receive New Messages: 訊息傳送時間 :Mon Jul 20 15:01:56 CST 2020
優點
- 實現簡單
- 任意精度
缺點
- 一條訊息多次消費,浪費資源
- 需要封裝客戶端
改進
- 在broker上實現時間輪降級