RocketMq-延遲訊息及 程式碼實現
阿新 • • 發佈:2018-12-17
支援延遲訊息
RocketMQ 支援定時訊息,但是不支援任意時間精度,僅支援特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。
配置
開啟安裝目錄的./conf/broker.conf 檔案,並新增如下延遲級別的時長設定:
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
配置檔案內容:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
延遲配置說明:
- 配置項配置了從1級開始,各級延時的時間,可以修改這個指定級別的延時時間;
- 時間單位支援:s、m、h、d,分別表示秒、分、時、天;
- 預設值就是上面宣告的,可手工調整;
- 預設值已夠用,不建議修改這個值。
如何傳送和接收延遲訊息?
首先建一個傳送訊息的服務類 RocketMQProvider
package com.ms.demo.demo.service; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StopWatch; import java.util.Date; import java.util.List; /** * @ClassName RocketMQProvider * @Description <p>TODO</p> * @Author Jakemanse * @Date 2018/10/24 15:22 * @Version 1.0 **/ @Service public class RocketMQProvider { @Value("${apache.rocketmq.producer.producerGroup}") private String produerGroup; @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; /** * @Desc <p> 延遲訊息傳送</p> * @Author Jakemanse * @Date 2018/10/24 15:59 * @Param * @Return void */ public void delayMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(produerGroup); producer.setNamesrvAddr(namesrvAddr); try { producer.start(); Message message = new Message("TopicTest", "push", "傳送延遲訊息---".getBytes()); message.setDelayTimeLevel(3); StopWatch stopWatch = new StopWatch(); stopWatch.start(); for (int i = 0; i < 1; i++) { SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer) o; int index = id % list.size(); return list.get(index); } }, 1,3000); } stopWatch.stop(); System.out.println(new Date().toString() + " 傳送1條延遲訊息耗時:" + stopWatch.getTotalTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
注意最重的一個設定 message.setDelayTimeLevel(3);
然後我們再建立一個訊息消費端服務類 RocketConsumer
package com.ms.demo.demo.service; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.Date; /** * @ClassName RocketConsumer * @Description <p>TODO</p> * @Author Jakemanse * @Date 2018/10/24 15:16 * @Version 1.0 **/ @Service public class RocketConsumer { @Value("${apache.rocketmq.producer.consumerGroup}") private String consumerGroup; @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; public void defaultMQPushConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(namesrvAddr); try { consumer.subscribe("TopicTest", "push"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) (list,context)->{ try { for (MessageExt messageExt : list) { // System.out.println("messageExt:" + messageExt); String messageBody = new String(messageExt.getBody()); System.out.println( new Date().toString() + " 消費響應:"+ messageExt.getMsgId() +", MsgBody:" + messageBody); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
這裡注意,一定不要忘記寫 cosumer.start(); 讓監聽和接收訊息服務啟動。
訊息配置properties 內容:
# 消費者的組名
apache.rocketmq.producer.consumerGroup=PushConsumer
# 生產者的組名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=127.0.0.1:9876
參考文章: