利用Redisson實現分散式延時任務排程功能
定時任務
定時任務是在編碼世界中經常遇到的問題,比如定時備份資料庫、定時重新整理快取等,可以通過Linux定時任務完成,也可以通過框架如Spring完成,但是在分散式場景中傳統單機可以完成功能就不太行了,所以需要藉助其他工具來實現任務排程的功能
場景:在一些訂單場景中,使用者下單後會鎖定一些資源,然後使用者非正常退出(沒有觸發取消訂單操作),導致訂單資源佔用無法釋放的問題。
藉助工具:redisson分散式服務中的
程式碼
關單任務
定時執行具體任務,主要實現關單,釋放相關資源(優惠券等),設定相關狀態標誌位
注意:Runnable、Callable介面二選一,必須實現序列化欄位,因為任務最終要被序列化儲存在Redis中
@Slf4j @Data public class CloseOrderTask implements Runnable, Serializable { private static final long serialVersionUID = -8193920383968460660L; /** * 訂單id */ private String orderId; @Override @SneakyThrows public void run() { // 獲取SpringBean,因為此物件不能為Spring所管理,所以需要通過工具獲取SpringBean RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class); OrderService orderService = SpringUtils.getBean(OrderService.class); // 任務必須加鎖,因為同一任務可能被多個例項所執行 RLock lock = redissonClient.getLock(ProductProperties.ORDER_TASK_LOCK + orderId); boolean lockFlag = false; try { //嘗試獲取鎖 lockFlag = lock.tryLock(0L, TimeUnit.SECONDS); if (!lockFlag){ return; } //獲取到鎖,正常執行關單操作 log.info("get lock order:{} ", orderId); OrderDTO order = orderService.getOrder(orderId); if(orderDetails.getStatus() == OrderStatus.NEW){ // 自動關單操作 orderService.autoCloseOrder(orderDetails); } }catch (Exception e){ //TODO 異常情況應新增郵件通知 }finally { if(lockFlag) { lock.unlock(); } } } }
關單訂單任務排程器
註冊一個任務排程器的Bean
注意:Bean的destory方法必須重寫,否則在進行關閉Spring容器時,任務排程中心會被關閉,再次啟動後不會喚醒
/** * 關單定時任務 */ @Bean(destroyMethod = "") public RScheduledExecutorService rScheduledExecutorService(@Autowired RedissonClient redissonClient){ WorkerOptions workerOptions = WorkerOptions.defaults().workers(CPU_NUM + 1); ExecutorOptions executorOptions = ExecutorOptions.defaults() .taskRetryInterval(10 * 60, TimeUnit.SECONDS); RScheduledExecutorService executorService = redissonClient .getExecutorService(ProductProperties.CLOSE_ORDER_TASK_EXECUTOR, executorOptions); executorService.registerWorkers(workerOptions); return executorService; }
服務層
例子實現了兩個方法,開啟一個定時任務和取消一個定時任務。
因為定時任務的取消時通過taskId取消的,所以在提交任務或獲取taskId,並對orderId和taskId做了一下對映,在取消訂單的時候就比較容易了
@Slf4j
@Service
public class CloseOrderServiceImpl implements CloseOrderService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RScheduledExecutorService executorService;
@Override
public void submitScheduleCloseTask(String orderId) {
CloseOrderTask closeOrderTask = new CloseOrderTask();
closeOrderTask.setOrderId(orderId);
RScheduledFuture<?> schedule = executorService.schedule(closeOrderTask, ProductProperties.ORDER_TTL_MIN, TimeUnit.MINUTES);
redisTemplate.opsForValue().set(ProductProperties.ORDER_TASK_MAPPING + orderId, schedule.getTaskId(), ProductProperties.ORDER_TTL_MIN, TimeUnit.MINUTES);
log.info("submit automatic close task. orderId: {}, taskId : {}", orderId, schedule.getTaskId());
}
@Override
public void cancelScheduleCloseTask(String orderId) {
String taskId = (String) redisTemplate.opsForValue().get(ProductProperties.ORDER_TASK_MAPPING + orderId);
log.info("cancel automatic close task. orderId: {}, taskId : {}", orderId, taskId);
if (!StringUtils.isEmptyStr(taskId)){
executorService.cancelTask(taskId);
}
}
}
具體使用地點
在使用者進行下單操作時可以提交一個定時任務
在使用者取消訂單時可以取消orderId對應的定時任務
原理分析
資料結構
提交一個任務後,Redis中會新增4個鍵值對
-
{任務佇列名:包路徑}:counter
-
string型別
-
記錄當前有多少任務待執行
-
-
{任務佇列名:包路徑}:shceduler
-
zset型別
-
通過score排名,任務佇列。通過value關聯tasks具體任務
-
-
{任務佇列名:包路徑}:retry-interval
-
string型別
-
任務重試時間,預設5000
-
-
{任務佇列名:包路徑}:tasks
-
Hash型別
-
所有具體任務,通過key與shceduler關聯執行順序
-
value序列化的任務
-
執行流程
提交一個延時任務排程任務,會在:scheduler中產生兩條資料,分別是任務下一次執行時間和任務下一次執行時間+重試時間
Spring在註冊ExecutorService時指定了工人(worker)的數量,會在本地起執行緒來執行這些待執行的任務。
問題探究
1、任務過期,客戶端掛了,然後過一度時間重啟,任務是否還會執行。
客戶端重啟後,過期的任務都會被拿到客戶端裡面進行消費。
存在同時啟動多個客戶端,是否會發生任務搶佔問題,內部是否有鎖機制?答案是會的
2、同一個任務是否會被多個客戶端執行
通過兩臺伺服器,每臺提交10個任務,多輪測試沒有發生同一個任務被多次執行的情況。
加上執行緒執行sleep後同樣進行測試,發現同一任務被不同客戶端消費,所以需要有鎖機制
3、任務(schedule 單次)執行完畢後,Redis是否會刪除任務佇列中的任務
任務執行完畢後,會刪除tasks、scheduler中的任務,同時counter-1
4、任務佇列的過期策略
所有任務的過期時間都是-1(永不過期),保證任務不會丟
5、任務執行異常情況
能夠正常捕捉到異常,並進行處理,同時任務會從Redis中刪除