1. 程式人生 > 其它 >redis 實現延遲佇列

redis 實現延遲佇列

redis 實現延遲佇列

1、延遲佇列工廠

package cn.xs.qishi.micro.plan.common.queue;

import cn.xs.ambi.bas.util.StringUtils;
import cn.xs.ambi.mgt.redis.RedisManager;
import lombok.CustomLog;
import org.springframework.util.CollectionUtils;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
 * redis 實現延時對列 planReminder
 *
 * @author liuxn
 * @date 2021/10/9
 */
@CustomLog
public abstract class RedisDelayQueueFactory {


    public abstract void initDelayQueue();

    /**
     * 最終執行的任務方法
     *
     * @param message 任務訊息
     */
    public abstract void invoke(String message);


    /**
     * 設定佇列名字
     */
    public abstract String setQueueName();

    /**
     * 傳送訊息
     *
     * @param content   內容
     * @param delayTime 延遲時間
     */
    public void send(String content, long delayTime) {
        RedisManager.getRedisTemplate().opsForZSet().add(setQueueName(), content, (double) delayTime);
        log.info(">> redis 延遲佇列:[" + setQueueName() + "],訊息內容:[" + content + "],delayTime:[" + delayTime + "]");
    }

    /**
     * 佇列消費者
     */
    public void startDelayQueueMachine(Executor asyncExecutor) {
        while (true) {
            try {
                long min = 0;
                long max = System.currentTimeMillis() ;
                Set<String> messages = RedisManager.getRedisTemplate().opsForZSet().rangeByScore(setQueueName(), min, max);
                assert messages != null;
                log.debug(">> 監控佇列:[" + setQueueName() + "],訊息數:[" + messages.size() + "]");
                // 如果不為空則遍歷判斷其是否滿足取消要求
                if (!CollectionUtils.isEmpty(messages)) {
                    for (String message : messages) {
                        if (StringUtils.isBlank(message)) {
                            continue;
                        }
                        long num = RedisManager.getRedisTemplate().opsForZSet().remove(setQueueName(), message);
                        //如果移除成功, 消費
                        if (num > 0) {
                            asyncExecutor.execute(() -> invoke(message));
                        }
                    }
                }
            } catch (Exception e) {
                log.error(">> 延遲佇列監聽異常", e);
            } finally {
                // 間隔30秒鐘搞一次
                try {
                    TimeUnit.SECONDS.sleep(30L);
                } catch (InterruptedException e) {
                    log.error(">> 延遲佇列監聽異常", e);
                }
            }
        }
    }

}

2、計劃佇列實現

package cn.xs.qishi.micro.plan.common.queue;

import cn.xs.qishi.entity.pojo.CcPlanSendLog;
import cn.xs.qishi.micro.plan.common.constant.Constant;
import cn.xs.qishi.micro.plan.common.constant.RedisConstant;
import cn.xs.qishi.micro.plan.common.vo.PlanReminderVo;
import cn.xs.qishi.micro.plan.service.PlanService;
import com.alibaba.fastjson.JSON;
import lombok.CustomLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 計劃任務提醒
 *
 * @author liuxn
 * @date 2021/10/11
 */
@Component
@CustomLog
public class PlanReminderQueue extends RedisDelayQueueFactory {


    @Autowired
    private PlanService planService;

    @Override
    public void initDelayQueue() {
        List<CcPlanSendLog> list = planService.getPlanSendLog(Constant.PLAN_SEND_LOG_STATUS_0);
        for (CcPlanSendLog sendLog : list) {
            PlanReminderVo reminderVo = new PlanReminderVo();
            reminderVo.setPlanId(sendLog.getPid());
            reminderVo.setPlanLogId(sendLog.getId());
            send(reminderVo.toString(),sendLog.getSendTime().getTime());
        }
        log.info(">> 專案啟動,提醒任務初始化..條數:"+list.size());
    }

    /**
     * 最終執行的任務方法
     *
     * @param message 任務訊息
     */
    @Override
    public void invoke(String message) {
        log.info(">> 計劃任務提醒佇列接收到延遲訊息:" + message);
        PlanReminderVo vo = JSON.parseObject(message, PlanReminderVo.class);
        planService.reminder(vo);
    }

    /**
     * 設定佇列名字
     */
    @Override
    public String setQueueName() {
        return RedisConstant.QUEUE_PLAN_REMIND;
    }
}

  3、專案啟動時初始化。以及開啟監聽

package cn.xs.qishi.micro.plan.common.runner;

import cn.xs.qishi.micro.plan.common.queue.PlanReminderQueue;
import lombok.CustomLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;

/**
 * 計劃定時提醒初始類
 *
 * @author liuxn
 * @date 2021/10/9
 */
@Component
@CustomLog
public class ScheduleReminderRunner implements ApplicationRunner {

    @Autowired
    private Executor asyncExecutor;
    @Autowired
    private PlanReminderQueue planReminderQueue;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info(">> 專案啟動完畢,初始化計劃任務鬧鐘提醒.啟動佇列監聽 ");
        asyncExecutor.execute(() -> planReminderQueue.startDelayQueueMachine(asyncExecutor));
        //佇列初始化
        planReminderQueue.initDelayQueue();
        log.info(">> 專案啟動完畢,初始化計劃任務鬧鐘提醒完畢 !!! ");

    }
}

4、注意

private Executor asyncExecutor; 是執行緒池
planReminderQueue.send("訊息內容","延遲時間"); 程式碼呼叫
initDelayQueue 方法根據業務需求進行實現