1. 程式人生 > 其它 >系統延遲任務設計

系統延遲任務設計

延遲任務設計思路

入隊操作:ZADD KEY timestamp task, 我們將需要處理的任務
按其需要延遲處理時間作為 Score 加入到 ZSet 中。Redis 的 ZAdd 的時間複雜度是 O(logN),N是 ZSet 中元素個數,因此我們能相對比較高效的進行入隊操作。

起一個程序定時(比如每隔一秒)通過 ZREANGEBYSCORE 方法查詢 ZSet 中 Score 最小的元素
具體操作為:ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES。查詢結果有兩種情況:
a. 查詢出的分數小於等於當前時間戳,說明到這個任務需要執行的時間了,則去非同步處理該任務;
b. 查詢出的分數大於當前時間戳,由於剛剛的查詢操作取出來的是分數最小的元素,所以說明 ZSet 中所有的任務都還沒有到需要執行的時間,則休眠一秒後繼續查詢;

同樣的,ZRANGEBYSCORE操作的時間複雜度為 O(logN + M),其中N為 ZSet 中元素個數,M 為查詢的元素個數,因此我們定時查詢操作也是比較高效的。

不足

0.兩個佇列每 5s 執行一次,所以並不能非常實時的執行任務。
1.兩個佇列每 5s 執行一次,掃描每個佇列中最近的 2 條記錄,如果在同一時間段有很多的任務需要執行,則無法準時執行。
2.不能動態增加 Redis 佇列

具體業務程式碼

延遲任務表

DROP TABLE IF EXISTS `bs_delay_task`;
CREATE TABLE `bs_delay_task`  (
  `ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `CREATE_DATE` datetime NULL DEFAULT NULL,
  `DOMAIN_ID` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
  `DOMAIN_TYPE` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
  `MODIFY_DATE` datetime NULL DEFAULT NULL,
  `NAME` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
  `REDIS_KEY` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
  `REDIS_VALUE` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
  `REMARK` varchar(1024) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `STATUS` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `TASK_EXECUTE_DATE` datetime NULL DEFAULT NULL,
  PRIMARY KEY (`ID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

定時任務

@Component
public class DelayTaskQuartz {

    @Autowired
    private DelayTaskService delayTaskService;

    // 5s 執行一次
    @Scheduled(cron = "2/5 * * * * ?")
    public void ConferenceQuartz() {
        delayTaskService.executeLoopTask
    }
}

DelayTaskService

@Service
@Transactional
public class DelayTaskService {

    Log log = LogFactory.get();

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public static final String DELAY_TEST_KEY_ONE = "redis_delay_queue_1";
    public static final String DELAY_TEST_KEY_TWO = "redis_delay_queue_2";
    public static List<String> REDIS_KEYS = new ArrayList<>(4);

    static {
        REDIS_KEYS.add(DELAY_TEST_KEY_ONE);
        REDIS_KEYS.add(DELAY_TEST_KEY_TWO);
    }

    /**
     * 建立延遲任務,在XX秒後執行
     */
    @Transactional
    public void createDelayTest(@NotNull Integer secondAfter, @NotNull String domainType, @NotNull String domainId) {
        Calendar rightNow = Calendar.getInstance();
        rightNow.setTime(new Date());
        rightNow.add(Calendar.SECOND, secondAfter);
        Date targetDate = rightNow.getTime();
        createDelayTest(targetDate, domainType, domainId);
    }

    /**
     * 建立延遲任務,在 executeDate 時執行
     */
    @Transactional
    public synchronized void createDelayTest(@NotNull Date executeDate, @NotNull String domainType, @NotNull String domainId) {

        DelayTaskQO delayTaskQO = new DelayTaskQO();
        delayTaskQO.setDomainType(domainType);
        delayTaskQO.setDomainId(domainId);
        delayTaskQO.setStatus(DelayTask.STATUS_READY);
        DelayTask delayTask = this.queryUnique(delayTaskQO);

        if (null != delayTask) {
            delayTask.setStatus(DelayTask.STATUS_CANCEL);
            delayTask.setModifyDate(new Date());
            delayTask.setRemark("新增重複任務,舊任務自動取消");
            this.update(delayTask);
        }
        String redisKey = getRandomRedisKey();
        // e.g. conference_uuid_20210524-120000
        String redisValue = domainType + "_" + domainId + "_" + Date2StrShort(executeDate);
        long score = Date2Score(executeDate);

        CreateDelayTaskCommand cmd = new CreateDelayTaskCommand();
        cmd.setName(domainType + " " + Date2Str(executeDate) + " 執行");
        cmd.setDomainType(domainType);
        cmd.setDomainId(domainId);
        cmd.setRedisKey(redisKey);
        cmd.setRedisValue(redisValue);
        cmd.setTaskExecuteDate(executeDate);
        this.create(cmd);
        RedisUtils.zAdd(redisKey, redisValue, score);
    }

    @Transactional
    public void create(CreateDelayTaskCommand command) {
        DelayTask delayTask = new DelayTask();
        delayTask.create(command);
        save(delayTask);
    }

    // 定時任務
    @Transactional
    public void executeLoopTask() {
        this.executeTask(DELAY_TEST_KEY_ONE);
        this.executeTask(DELAY_TEST_KEY_TWO);
    }

    @Transactional
    public void executeTask(String key) {
        Set<ZSetOperations.TypedTuple<String>> values =
                RedisUtils.zRangeByScoreWithScores(key, Long.MIN_VALUE, Long.MAX_VALUE, 0, 2);
        if (null != values) {
            values.forEach(v -> {
                Double timeScoreWithTask = v.getScore();
                String value = v.getValue();
                // 判斷獲取的任務是否要執行
                if (System.currentTimeMillis() > timeScoreWithTask) {
                    RedisUtils.zRemove(key, value);

                    String[] splitStr = value.split("_");
                    String domainType = splitStr[0];
                    String domainId = splitStr[1];
                    // 傳送通知給訂閱者
                    applicationEventPublisher.publishEvent(new DelayTestExecuteEvent(domainType, domainId, key));

                    log.error("DelayTestService " + value + " 任務已經執行");
                } else {
                    log.info(key + ":" + value + " 未到執行時間,不需要執行");
                }
            });
        }
    }


    private String getRandomRedisKey() {
        Random random = new Random();
        int n = random.nextInt(REDIS_KEYS.size());
        return REDIS_KEYS.get(n);
    }

    private String Date2Str(Date date) {
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return df.format(date);
    }

    private String Date2StrShort(Date date) {
        DateFormat df = new SimpleDateFormat("yyyyMMdd-HHmmss");
        return df.format(date);
    }

    private long Date2Score(Date date) {
        Calendar cal = Calendar.getInstance();
        cal.setTime(date);
        Long score = cal.getTimeInMillis();
        return score;
    }

}

觀察者物件

public class DelayTestExecuteEvent extends ApplicationEvent {
    private String domainType;
    private String domainId;
    private String redisKey;

    public DelayTestExecuteEvent(Object source) {
        super(source);
    }

    public DelayTestExecuteEvent(String domainType, String domainId, String redisKey) {
        super("");
        this.domainType = domainType;
        this.domainId = domainId;
        this.redisKey = redisKey;
    }

    public String getDomainType() {
        return domainType;
    }

    public void setDomainType(String domainType) {
        this.domainType = domainType;
    }

    public String getDomainId() {
        return domainId;
    }

    public void setDomainId(String domainId) {
        this.domainId = domainId;
    }

    public String getRedisKey() {
        return redisKey;
    }

    public void setRedisKey(String redisKey) {
        this.redisKey = redisKey;
    }
}


@Component
public class DelayTestExecuteObserver implements ApplicationListener<DelayTestExecuteEvent> {

    Log log = LogFactory.get();

    @Autowired
    private DelayTaskService delayTaskService;


    @Override
    public void onApplicationEvent(DelayTestExecuteEvent event) {
        String domainType = event.getDomainType();
        String domainId = event.getDomainId();
        String redisKey = event.getRedisKey();

        log.info("正在執行 domainType:" + domainType + " domainId:" + domainId + " redisKey:" + redisKey);

        DelayTaskQO delayTaskQO = new DelayTaskQO();
        delayTaskQO.setDomainType(domainType);
        delayTaskQO.setDomainId(domainId);
        delayTaskQO.setStatus(DelayTask.STATUS_READY);
        DelayTask delayTask = delayTaskService.queryUnique(delayTaskQO);
        if (null == delayTask) {
            return;
        }

        // 執行任務
        try {
            executeTask(delayTask);

            delayTask.setStatus(DelayTask.STATUS_FINISH);
            delayTask.setModifyDate(new Date());
            delayTaskService.update(delayTask);
        } catch (Exception e) {
            log.error(e);

            delayTask.setStatus(DelayTask.STATUS_FINISH_ERROR);
            delayTask.setModifyDate(new Date());
            delayTaskService.update(delayTask);
        }
    }

    private void executeTask(DelayTask delayTask) throws BaseException {
        String domainId = delayTask.getDomainId();
        String domainType = delayTask.getDomainType();

        // TODO 這裡可以優化,可以使用策略
        if (StringUtils.equals(domainType, DelayTask.XXX)) {
           // 具體的任務程式碼
        }
    }
}