阿里雲RocketMQ定時/延遲訊息佇列實現
阿新 • • 發佈:2021-08-05
新的閱讀體驗:http://www.zhouhong.icu/post/157
一、業務需求
需要實現一個提前二十分鐘通知使用者去做某件事的一個業務,拿到這個業務首先想到的最簡單得方法就是使用Redis監控Key值:在排計劃時候計算當前時間與提前二十分鐘這個時間差,然後使用一個唯一的業務Key壓入Redis中並設定好過期時間,然後只需要讓Redis監控這個Key值即可,當這個Key過期後就可以直接拿到這個Key的值然後實現發訊息等業務。
關於Redis實現該業務的具體實現在之前我已經記過一篇筆記,有興趣的可以直接去瞅瞅,但是現在感覺有好多不足之處。
Redis實現定時: http://www.zhouhong.icu/post/144
二、Redis實現定時推送等功能的不足之處
由於Redis不止你一個使用,其他業務也會使用Redis,那麼最容易想到的一個缺點就是:1、如果在提醒的那一刻有大量的其他業務的Key也過期了,那麼就會很長時間都輪不到你的這個Key,就會出現訊息推送延遲等缺點;2、還有一個缺點就是像阿里雲他們的Redis根本就不支援對 Redis 的 Key值得監控(我也是因為公司使用阿里雲的Redis沒法對Key監控才從之前使用Redis監控轉移到使用RocketMQ的延時訊息推送的。。。)
三、阿里雲RocketMQ定時/延遲訊息佇列實現
其實在實現上非常簡單
1、首先去阿里雲控制檯建立所需訊息佇列資源,包括訊息佇列 RocketMQ 的例項、Topic、Group ID (GID),以及鑑權需要的 AccessKey(AK),一般公司都有現成的可以直接使用。
2、在springboot專案pom.xml新增需要的依賴。
<!--阿里雲MQ TCP--> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.7.1.Final</version> </dependency>
3、在對應環境的application.properties檔案配置引數
console: rocketmq: tcp: accessKey: XXXXXXXX使用自己的 secretKey: XXXXXXXXXXXXX使用自己的 nameSrvAddr: XXXXXXXXXXXXXXXX使用自己的 topic: XXXXXXX使用自己的 groupId: XXXXXXX使用自己的 tag: XXXXXXXXX使用自己的
4、封裝MQ配置類
import com.aliyun.openservices.ons.api.PropertyKeyConst; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import java.util.Properties; /** * @Description: MQ配置類 * @Author: zhouhong * @Date: 2021/8/4 */ @Configuration @EnableConfigurationProperties({PatrolMqConfig.class}) @ConfigurationProperties(prefix = "console.rocketmq.tcp") @Primary public class PatrolMqConfig { private String accessKey; private String secretKey; private String nameSrvAddr; private String topic; private String groupId; private String tag; private String orderTopic; private String orderGroupId; private String orderTag; public Properties getMqPropertie() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); return properties; } public String getAccessKey() { return accessKey; } public void setAccessKey(String accessKey) { this.accessKey = accessKey; } public String getSecretKey() { return secretKey; } public void setSecretKey(String secretKey) { this.secretKey = secretKey; } public String getNameSrvAddr() { return nameSrvAddr; } public void setNameSrvAddr(String nameSrvAddr) { this.nameSrvAddr = nameSrvAddr; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getOrderTopic() { return orderTopic; } public void setOrderTopic(String orderTopic) { this.orderTopic = orderTopic; } public String getOrderGroupId() { return orderGroupId; } public void setOrderGroupId(String orderGroupId) { this.orderGroupId = orderGroupId; } public String getOrderTag() { return orderTag; } public void setOrderTag(String orderTag) { this.orderTag = orderTag; } }
5、配置生產者
import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class PatrolProducerClient { @Autowired private PatrolMqConfig mqConfig; @Bean(name = "ConsoleProducer", initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(mqConfig.getMqPropertie()); return producer; } }
6、消費者訂閱
import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import java.util.Properties; //專案中加上 @Configuration 註解,這樣服務啟動時consumer也啟動了 @Configuration @Slf4j public class PatrolConsumerClient { @Autowired private PatrolMqConfig mqConfig; @Autowired private MqTimeMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置檔案 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId()); //將消費者執行緒數固定為20個 20為預設值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); consumerBean.setProperties(properties); //訂閱關係 Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getTopic()); subscription.set); subscriptionTable.put(subscription, messageListener); //訂閱多個topic如上面設定 consumerBean.setSubscriptionTable(subscriptionTable); System.err.println("訂閱成功!"); return consumerBean; } }
7、定時延時MQ訊息監聽消費
/** * @Description: 定時/延時MQ訊息監聽消費 * @Author: zhouhong * @Create: 2021-08-03 09:16 **/ @Component public class MqTimeMessageListener implements MessageListener { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public Action consume(Message message, ConsumeContext context) { System.err.println("收到訊息啦!!"); logger.info("接收到MQ訊息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}", message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody())); try { String msgTag = message.getTag(); // 訊息型別 String msgKey = message.getKey(); // 業務唯一id switch (msgTag) { case "XXXX": // TODO 具體業務實現,比如發訊息等操作 System.err.println("推送成功!!!!"); break; } return Action.CommitMessage; } catch (Exception e) { logger.error("消費MQ訊息失敗! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage()); //消費失敗,告知伺服器稍後再投遞這條訊息,繼續消費其他訊息 return Action.ReconsumeLater; } } }
8、封裝一個發延時/定時訊息的工具類
/** * @Description: MQ傳送訊息助手 * @Author: zhouhong * @Create: 2021-08-03 09:06 **/ @Component public class ProducerUtil { private Logger logger = LoggerFactory.getLogger(ProducerUtil.class); @Autowired private PatrolMqConfig config; @Resource(name = "ConsoleProducer") ProducerBean producerBean; public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) { Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody); msg.setStartDeliverTime(delayTime); return this.send(msg,Boolean.FALSE); } /** * 普通訊息傳送發放 * @param msg 訊息 * @param isOneWay 是否單向傳送 */ private SendResult send(Message msg,Boolean isOneWay) { try { if(isOneWay) { //由於在 oneway 方式傳送訊息時沒有請求應答處理,一旦出現訊息傳送失敗,則會因為沒有重試而導致資料丟失。 //若資料不可丟,建議選用同步或非同步傳送方式。 producerBean.sendOneway(msg); success(msg, "單向訊息MsgId不返回"); return null; }else { //可靠同步傳送 SendResult sendResult = producerBean.send(msg); //獲取傳送結果,不拋異常即傳送成功 if (sendResult != null) { success(msg, sendResult.getMessageId()); return sendResult; }else { error(msg,null); return null; } } } catch (Exception e) { error(msg,e); return null; } } private ExecutorService threads = Executors.newFixedThreadPool(3); private void error(Message msg,Exception e) { logger.error("傳送MQ訊息失敗-- Topic:{}, Key:{}, tag:{}, body:{}" ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody())); logger.error("errorMsg --- {}",e.getMessage()); } private void success(Message msg,String messageId) { logger.info("傳送MQ訊息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}" ,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody())); } }
9、介面測試(10000表示延遲10秒,可以根據自己的業務計算出)
// 測試MQ延時 @Autowired ProducerUtil producerUtil; @PostMapping("/patrolTaskTemp/mqtest") public void mqTime(){ producerUtil.sendTimeMsg( "SMARTPATROL", "你好鴨!!!".getBytes(), "紅紅火火恍恍惚惚!!", System.currentTimeMillis() + 10000 ); }
10、結果
2021-08-04 22:07:12.677 INFO 17548 --- [nio-8498-exec-2] c.h.i.i.s.m.common.util.ProducerUtil : 傳送MQ訊息成功 -- Topic:TID_COMMON ,msgId:C0A80168448C2F0E140B14322CB30000 , Key:紅紅火火恍恍惚惚!!, tag:SMARTPATROL, body:你好鴨!!!
收到訊息啦!!
推送成功!!!!
2021-08-04 22:07:22.179 INFO 17548 --- [MessageThread_1] c.h.i.i.s.m.m.t.n.MqTimeMessageListener : 接收到MQ訊息 -- Topic:TID_COMMON, tag:SMARTPATROL,msgId:0b17f2e71ebd1b054c2c156f6d1d1655 , Key:紅紅火火恍恍惚惚!!, body:你好鴨!!!