redis 實現延遲佇列
阿新 • • 發佈:2021-10-11
redis 實現延遲佇列
1、延遲佇列工廠
package cn.xs.qishi.micro.plan.common.queue; import cn.xs.ambi.bas.util.StringUtils; import cn.xs.ambi.mgt.redis.RedisManager; import lombok.CustomLog; import org.springframework.util.CollectionUtils; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** * redis 實現延時對列 planReminder * * @author liuxn * @date 2021/10/9 */ @CustomLog public abstract class RedisDelayQueueFactory { public abstract void initDelayQueue(); /** * 最終執行的任務方法 * * @param message 任務訊息 */ public abstract void invoke(String message); /** * 設定佇列名字 */ public abstract String setQueueName(); /** * 傳送訊息 * * @param content 內容 * @param delayTime 延遲時間 */ public void send(String content, long delayTime) { RedisManager.getRedisTemplate().opsForZSet().add(setQueueName(), content, (double) delayTime); log.info(">> redis 延遲佇列:[" + setQueueName() + "],訊息內容:[" + content + "],delayTime:[" + delayTime + "]"); } /** * 佇列消費者 */ public void startDelayQueueMachine(Executor asyncExecutor) { while (true) { try { long min = 0; long max = System.currentTimeMillis() ; Set<String> messages = RedisManager.getRedisTemplate().opsForZSet().rangeByScore(setQueueName(), min, max); assert messages != null; log.debug(">> 監控佇列:[" + setQueueName() + "],訊息數:[" + messages.size() + "]"); // 如果不為空則遍歷判斷其是否滿足取消要求 if (!CollectionUtils.isEmpty(messages)) { for (String message : messages) { if (StringUtils.isBlank(message)) { continue; } long num = RedisManager.getRedisTemplate().opsForZSet().remove(setQueueName(), message); //如果移除成功, 消費 if (num > 0) { asyncExecutor.execute(() -> invoke(message)); } } } } catch (Exception e) { log.error(">> 延遲佇列監聽異常", e); } finally { // 間隔30秒鐘搞一次 try { TimeUnit.SECONDS.sleep(30L); } catch (InterruptedException e) { log.error(">> 延遲佇列監聽異常", e); } } } } }
2、計劃佇列實現
package cn.xs.qishi.micro.plan.common.queue; import cn.xs.qishi.entity.pojo.CcPlanSendLog; import cn.xs.qishi.micro.plan.common.constant.Constant; import cn.xs.qishi.micro.plan.common.constant.RedisConstant; import cn.xs.qishi.micro.plan.common.vo.PlanReminderVo; import cn.xs.qishi.micro.plan.service.PlanService; import com.alibaba.fastjson.JSON; import lombok.CustomLog; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * 計劃任務提醒 * * @author liuxn * @date 2021/10/11 */ @Component @CustomLog public class PlanReminderQueue extends RedisDelayQueueFactory { @Autowired private PlanService planService; @Override public void initDelayQueue() { List<CcPlanSendLog> list = planService.getPlanSendLog(Constant.PLAN_SEND_LOG_STATUS_0); for (CcPlanSendLog sendLog : list) { PlanReminderVo reminderVo = new PlanReminderVo(); reminderVo.setPlanId(sendLog.getPid()); reminderVo.setPlanLogId(sendLog.getId()); send(reminderVo.toString(),sendLog.getSendTime().getTime()); } log.info(">> 專案啟動,提醒任務初始化..條數:"+list.size()); } /** * 最終執行的任務方法 * * @param message 任務訊息 */ @Override public void invoke(String message) { log.info(">> 計劃任務提醒佇列接收到延遲訊息:" + message); PlanReminderVo vo = JSON.parseObject(message, PlanReminderVo.class); planService.reminder(vo); } /** * 設定佇列名字 */ @Override public String setQueueName() { return RedisConstant.QUEUE_PLAN_REMIND; } }
3、專案啟動時初始化。以及開啟監聽
package cn.xs.qishi.micro.plan.common.runner; import cn.xs.qishi.micro.plan.common.queue.PlanReminderQueue; import lombok.CustomLog; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.util.concurrent.Executor; /** * 計劃定時提醒初始類 * * @author liuxn * @date 2021/10/9 */ @Component @CustomLog public class ScheduleReminderRunner implements ApplicationRunner { @Autowired private Executor asyncExecutor; @Autowired private PlanReminderQueue planReminderQueue; @Override public void run(ApplicationArguments args) throws Exception { log.info(">> 專案啟動完畢,初始化計劃任務鬧鐘提醒.啟動佇列監聽 "); asyncExecutor.execute(() -> planReminderQueue.startDelayQueueMachine(asyncExecutor)); //佇列初始化 planReminderQueue.initDelayQueue(); log.info(">> 專案啟動完畢,初始化計劃任務鬧鐘提醒完畢 !!! "); } }
4、注意
private Executor asyncExecutor; 是執行緒池
planReminderQueue.send("訊息內容","延遲時間"); 程式碼呼叫
initDelayQueue 方法根據業務需求進行實現