RocketMQ 延遲訊息
阿新 • • 發佈:2021-01-24
技術標籤:rocketMQ延遲訊息佇列java中介軟體java佇列
延遲訊息
生產者把訊息傳送到訊息佇列中以後,並不期望被立即消費,而是等待指定時間後才可以被消費者消費,這類訊息通常被稱為延遲訊息。
在RocketMQ中,支援延遲訊息,但是不支援任意時間精度的延遲訊息,只支援特定級別的延遲訊息。
訊息延遲級別分別為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級別。
本文使用 RocketMQ 延遲訊息處理訂單超時場景
傳送延遲訊息(生產者)
/** * 推送延遲訊息 * @param topic * @param body * @param producerGroup * @return boolean */ public boolean sendMessage(String topic, String body, String producerGroup) { try { Message recordMsg = new Message(topic, body.getBytes()); producer.setProducerGroup(producerGroup); //設定訊息延遲級別,我這裡設定5,對應就是延時一分鐘 // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" recordMsg.setDelayTimeLevel(5); // 傳送訊息到一個Broker SendResult sendResult = producer.send(recordMsg); // 通過sendResult返回訊息是否成功送達 log.info("傳送延遲訊息結果:======sendResult:{}", sendResult); DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("傳送時間:{}", format.format(new Date())); return true; } catch (Exception e) { e.printStackTrace(); log.error("延遲訊息佇列推送訊息異常:{},推送內容:{}", e.getMessage(), body); } return false; }
傳送請求結果:
消費延遲訊息(消費者)
/** * 接收延遲訊息 * * @param topic * @param consumerGroup * @param messageHandler */ public void messageListener(String topic, String consumerGroup, MessageListenerConcurrently messageHandler) { ThreadPoolUtil.execute(() -> { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup(consumerGroup); consumer.setVipChannelEnabled(false); consumer.setNamesrvAddr(address); //設定消費者拉取訊息的策略,*表示消費該topic下的所有訊息,也可以指定tag進行訊息過濾 consumer.subscribe(topic, "*"); //消費者端啟動訊息監聽,一旦生產者傳送訊息被監聽到,就列印訊息,和rabbitmq中的handlerDelivery類似 consumer.registerMessageListener(messageHandler); consumer.start(); log.info("啟動延遲訊息佇列監聽成功:" + topic); } catch (MQClientException e) { log.error("啟動延遲訊息佇列監聽失敗:{}", e.getErrorMessage()); System.exit(1); } }); }
消費結果:在一分鐘之後被消費
實現監聽類,處理具體邏輯
/** * 延遲訊息監聽 * * @author hucj */ @Component public class CourseOrderTimeoutListener implements ApplicationListener<ApplicationReadyEvent> { @Resource private MQUtil mqUtil; @Resource private CourseOrderTimeoutHandler courseOrderTimeoutHandler; @Override public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { // 訂單超時監聽 mqUtil.messageListener(EnumTopic.ORDER_TIMEOUT, EnumGroup.ORDER_TIMEOUT_GROUP, courseOrderTimeoutHandler); } } /** * 實現監聽 * * @author hucj */ @Slf4j @Component public class CourseOrderTimeoutHandler implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { // 得到訊息體 String body = new String(msg.getBody()); JSONObject userJson = JSONObject.parseObject(body); TCourseBuy courseBuyDetails = JSON.toJavaObject(userJson, TCourseBuy.class); // 處理具體的業務邏輯,,,,, DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("消費時間:{}", format.format(new Date())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
不足或疑問請留言