系統延遲任務設計
阿新 • • 發佈:2021-12-07
延遲任務設計思路
入隊操作:ZADD KEY timestamp task, 我們將需要處理的任務
按其需要延遲處理時間作為 Score 加入到 ZSet 中。Redis 的 ZAdd 的時間複雜度是 O(logN),N是 ZSet 中元素個數,因此我們能相對比較高效的進行入隊操作。
起一個程序定時(比如每隔一秒)通過 ZREANGEBYSCORE 方法查詢 ZSet 中 Score 最小的元素
具體操作為:ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES。查詢結果有兩種情況:
a. 查詢出的分數小於等於當前時間戳,說明到這個任務需要執行的時間了,則去非同步處理該任務;
b. 查詢出的分數大於當前時間戳,由於剛剛的查詢操作取出來的是分數最小的元素,所以說明 ZSet 中所有的任務都還沒有到需要執行的時間,則休眠一秒後繼續查詢;
同樣的,ZRANGEBYSCORE操作的時間複雜度為 O(logN + M),其中N為 ZSet 中元素個數,M 為查詢的元素個數,因此我們定時查詢操作也是比較高效的。
不足
0.兩個佇列每 5s 執行一次,所以並不能非常實時的執行任務。
1.兩個佇列每 5s 執行一次,掃描每個佇列中最近的 2 條記錄,如果在同一時間段有很多的任務需要執行,則無法準時執行。
2.不能動態增加 Redis 佇列
具體業務程式碼
延遲任務表
DROP TABLE IF EXISTS `bs_delay_task`; CREATE TABLE `bs_delay_task` ( `ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `CREATE_DATE` datetime NULL DEFAULT NULL, `DOMAIN_ID` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `DOMAIN_TYPE` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `MODIFY_DATE` datetime NULL DEFAULT NULL, `NAME` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `REDIS_KEY` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `REDIS_VALUE` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `REMARK` varchar(1024) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `STATUS` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `TASK_EXECUTE_DATE` datetime NULL DEFAULT NULL, PRIMARY KEY (`ID`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
定時任務
@Component
public class DelayTaskQuartz {
@Autowired
private DelayTaskService delayTaskService;
// 5s 執行一次
@Scheduled(cron = "2/5 * * * * ?")
public void ConferenceQuartz() {
delayTaskService.executeLoopTask
}
}
DelayTaskService
@Service @Transactional public class DelayTaskService { Log log = LogFactory.get(); @Autowired private ApplicationEventPublisher applicationEventPublisher; public static final String DELAY_TEST_KEY_ONE = "redis_delay_queue_1"; public static final String DELAY_TEST_KEY_TWO = "redis_delay_queue_2"; public static List<String> REDIS_KEYS = new ArrayList<>(4); static { REDIS_KEYS.add(DELAY_TEST_KEY_ONE); REDIS_KEYS.add(DELAY_TEST_KEY_TWO); } /** * 建立延遲任務,在XX秒後執行 */ @Transactional public void createDelayTest(@NotNull Integer secondAfter, @NotNull String domainType, @NotNull String domainId) { Calendar rightNow = Calendar.getInstance(); rightNow.setTime(new Date()); rightNow.add(Calendar.SECOND, secondAfter); Date targetDate = rightNow.getTime(); createDelayTest(targetDate, domainType, domainId); } /** * 建立延遲任務,在 executeDate 時執行 */ @Transactional public synchronized void createDelayTest(@NotNull Date executeDate, @NotNull String domainType, @NotNull String domainId) { DelayTaskQO delayTaskQO = new DelayTaskQO(); delayTaskQO.setDomainType(domainType); delayTaskQO.setDomainId(domainId); delayTaskQO.setStatus(DelayTask.STATUS_READY); DelayTask delayTask = this.queryUnique(delayTaskQO); if (null != delayTask) { delayTask.setStatus(DelayTask.STATUS_CANCEL); delayTask.setModifyDate(new Date()); delayTask.setRemark("新增重複任務,舊任務自動取消"); this.update(delayTask); } String redisKey = getRandomRedisKey(); // e.g. conference_uuid_20210524-120000 String redisValue = domainType + "_" + domainId + "_" + Date2StrShort(executeDate); long score = Date2Score(executeDate); CreateDelayTaskCommand cmd = new CreateDelayTaskCommand(); cmd.setName(domainType + " " + Date2Str(executeDate) + " 執行"); cmd.setDomainType(domainType); cmd.setDomainId(domainId); cmd.setRedisKey(redisKey); cmd.setRedisValue(redisValue); cmd.setTaskExecuteDate(executeDate); this.create(cmd); RedisUtils.zAdd(redisKey, redisValue, score); } @Transactional public void create(CreateDelayTaskCommand command) { DelayTask delayTask = new DelayTask(); delayTask.create(command); save(delayTask); } // 定時任務 @Transactional public void executeLoopTask() { this.executeTask(DELAY_TEST_KEY_ONE); this.executeTask(DELAY_TEST_KEY_TWO); } @Transactional public void executeTask(String key) { Set<ZSetOperations.TypedTuple<String>> values = RedisUtils.zRangeByScoreWithScores(key, Long.MIN_VALUE, Long.MAX_VALUE, 0, 2); if (null != values) { values.forEach(v -> { Double timeScoreWithTask = v.getScore(); String value = v.getValue(); // 判斷獲取的任務是否要執行 if (System.currentTimeMillis() > timeScoreWithTask) { RedisUtils.zRemove(key, value); String[] splitStr = value.split("_"); String domainType = splitStr[0]; String domainId = splitStr[1]; // 傳送通知給訂閱者 applicationEventPublisher.publishEvent(new DelayTestExecuteEvent(domainType, domainId, key)); log.error("DelayTestService " + value + " 任務已經執行"); } else { log.info(key + ":" + value + " 未到執行時間,不需要執行"); } }); } } private String getRandomRedisKey() { Random random = new Random(); int n = random.nextInt(REDIS_KEYS.size()); return REDIS_KEYS.get(n); } private String Date2Str(Date date) { DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return df.format(date); } private String Date2StrShort(Date date) { DateFormat df = new SimpleDateFormat("yyyyMMdd-HHmmss"); return df.format(date); } private long Date2Score(Date date) { Calendar cal = Calendar.getInstance(); cal.setTime(date); Long score = cal.getTimeInMillis(); return score; } }
觀察者物件
public class DelayTestExecuteEvent extends ApplicationEvent {
private String domainType;
private String domainId;
private String redisKey;
public DelayTestExecuteEvent(Object source) {
super(source);
}
public DelayTestExecuteEvent(String domainType, String domainId, String redisKey) {
super("");
this.domainType = domainType;
this.domainId = domainId;
this.redisKey = redisKey;
}
public String getDomainType() {
return domainType;
}
public void setDomainType(String domainType) {
this.domainType = domainType;
}
public String getDomainId() {
return domainId;
}
public void setDomainId(String domainId) {
this.domainId = domainId;
}
public String getRedisKey() {
return redisKey;
}
public void setRedisKey(String redisKey) {
this.redisKey = redisKey;
}
}
@Component
public class DelayTestExecuteObserver implements ApplicationListener<DelayTestExecuteEvent> {
Log log = LogFactory.get();
@Autowired
private DelayTaskService delayTaskService;
@Override
public void onApplicationEvent(DelayTestExecuteEvent event) {
String domainType = event.getDomainType();
String domainId = event.getDomainId();
String redisKey = event.getRedisKey();
log.info("正在執行 domainType:" + domainType + " domainId:" + domainId + " redisKey:" + redisKey);
DelayTaskQO delayTaskQO = new DelayTaskQO();
delayTaskQO.setDomainType(domainType);
delayTaskQO.setDomainId(domainId);
delayTaskQO.setStatus(DelayTask.STATUS_READY);
DelayTask delayTask = delayTaskService.queryUnique(delayTaskQO);
if (null == delayTask) {
return;
}
// 執行任務
try {
executeTask(delayTask);
delayTask.setStatus(DelayTask.STATUS_FINISH);
delayTask.setModifyDate(new Date());
delayTaskService.update(delayTask);
} catch (Exception e) {
log.error(e);
delayTask.setStatus(DelayTask.STATUS_FINISH_ERROR);
delayTask.setModifyDate(new Date());
delayTaskService.update(delayTask);
}
}
private void executeTask(DelayTask delayTask) throws BaseException {
String domainId = delayTask.getDomainId();
String domainType = delayTask.getDomainType();
// TODO 這裡可以優化,可以使用策略
if (StringUtils.equals(domainType, DelayTask.XXX)) {
// 具體的任務程式碼
}
}
}