1. 程式人生 > 資料庫 >利用Redisson實現分散式延時任務排程功能

利用Redisson實現分散式延時任務排程功能

 定時任務

定時任務是在編碼世界中經常遇到的問題,比如定時備份資料庫、定時重新整理快取等,可以通過Linux定時任務完成,也可以通過框架如Spring完成,但是在分散式場景中傳統單機可以完成功能就不太行了,所以需要藉助其他工具來實現任務排程的功能

 場景:在一些訂單場景中,使用者下單後會鎖定一些資源,然後使用者非正常退出(沒有觸發取消訂單操作),導致訂單資源佔用無法釋放的問題。

藉助工具:redisson分散式服務中的

程式碼

關單任務

定時執行具體任務,主要實現關單,釋放相關資源(優惠券等),設定相關狀態標誌位

注意:Runnable、Callable介面二選一,必須實現序列化欄位,因為任務最終要被序列化儲存在Redis中

@Slf4j
@Data
public class CloseOrderTask implements Runnable, Serializable {
    private static final long serialVersionUID = -8193920383968460660L;

    /**
     * 訂單id
     */
    private String orderId;

    @Override
    @SneakyThrows
    public void run() {
        // 獲取SpringBean,因為此物件不能為Spring所管理,所以需要通過工具獲取SpringBean
        RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
        OrderService orderService = SpringUtils.getBean(OrderService.class);
        // 任務必須加鎖,因為同一任務可能被多個例項所執行
        RLock lock = redissonClient.getLock(ProductProperties.ORDER_TASK_LOCK + orderId);
        boolean lockFlag = false;
        try {
            //嘗試獲取鎖
            lockFlag = lock.tryLock(0L, TimeUnit.SECONDS);
            if (!lockFlag){ return; }
            //獲取到鎖,正常執行關單操作
            log.info("get lock order:{} ", orderId);
            OrderDTO order = orderService.getOrder(orderId);
            if(orderDetails.getStatus() == OrderStatus.NEW){
                // 自動關單操作
                orderService.autoCloseOrder(orderDetails);
            }
        }catch (Exception e){
            //TODO 異常情況應新增郵件通知
        }finally {
            if(lockFlag) {
                lock.unlock();
            }
        }

    }
}

關單訂單任務排程器

註冊一個任務排程器的Bean

注意:Bean的destory方法必須重寫,否則在進行關閉Spring容器時,任務排程中心會被關閉,再次啟動後不會喚醒

    /**
     * 關單定時任務
     */
    @Bean(destroyMethod = "")
    public RScheduledExecutorService rScheduledExecutorService(@Autowired RedissonClient redissonClient){
        WorkerOptions workerOptions = WorkerOptions.defaults().workers(CPU_NUM + 1);
        ExecutorOptions executorOptions = ExecutorOptions.defaults()
                .taskRetryInterval(10 * 60, TimeUnit.SECONDS);
        RScheduledExecutorService executorService = redissonClient
                .getExecutorService(ProductProperties.CLOSE_ORDER_TASK_EXECUTOR, executorOptions);
        executorService.registerWorkers(workerOptions);
        return executorService;
    }

服務層

例子實現了兩個方法,開啟一個定時任務和取消一個定時任務。

因為定時任務的取消時通過taskId取消的,所以在提交任務或獲取taskId,並對orderId和taskId做了一下對映,在取消訂單的時候就比較容易了

@Slf4j
@Service
public class CloseOrderServiceImpl implements CloseOrderService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private RScheduledExecutorService executorService;

    @Override
    public void submitScheduleCloseTask(String orderId) {
        CloseOrderTask closeOrderTask = new CloseOrderTask();
        closeOrderTask.setOrderId(orderId);
        RScheduledFuture<?> schedule = executorService.schedule(closeOrderTask, ProductProperties.ORDER_TTL_MIN, TimeUnit.MINUTES);
        redisTemplate.opsForValue().set(ProductProperties.ORDER_TASK_MAPPING + orderId, schedule.getTaskId(), ProductProperties.ORDER_TTL_MIN, TimeUnit.MINUTES);
        log.info("submit automatic close task. orderId: {}, taskId : {}", orderId, schedule.getTaskId());
    }

    @Override
    public void cancelScheduleCloseTask(String orderId) {
        String taskId = (String) redisTemplate.opsForValue().get(ProductProperties.ORDER_TASK_MAPPING + orderId);
        log.info("cancel automatic close task. orderId: {}, taskId : {}", orderId, taskId);
        if (!StringUtils.isEmptyStr(taskId)){
            executorService.cancelTask(taskId);
        }

    }
}

具體使用地點

在使用者進行下單操作時可以提交一個定時任務

在使用者取消訂單時可以取消orderId對應的定時任務

原理分析

資料結構

提交一個任務後,Redis中會新增4個鍵值對

  • {任務佇列名:包路徑}:counter

    • string型別

    • 記錄當前有多少任務待執行

  • {任務佇列名:包路徑}:shceduler

    • zset型別

    • 通過score排名,任務佇列。通過value關聯tasks具體任務

  • {任務佇列名:包路徑}:retry-interval

    • string型別

    • 任務重試時間,預設5000

  • {任務佇列名:包路徑}:tasks

    • Hash型別

    • 所有具體任務,通過key與shceduler關聯執行順序

    • value序列化的任務

執行流程

提交一個延時任務排程任務,會在:scheduler中產生兩條資料,分別是任務下一次執行時間和任務下一次執行時間+重試時間

Spring在註冊ExecutorService時指定了工人(worker)的數量,會在本地起執行緒來執行這些待執行的任務。

問題探究

1、任務過期,客戶端掛了,然後過一度時間重啟,任務是否還會執行。

客戶端重啟後,過期的任務都會被拿到客戶端裡面進行消費。

存在同時啟動多個客戶端,是否會發生任務搶佔問題,內部是否有鎖機制?答案是會的

2、同一個任務是否會被多個客戶端執行

 通過兩臺伺服器,每臺提交10個任務,多輪測試沒有發生同一個任務被多次執行的情況。

加上執行緒執行sleep後同樣進行測試,發現同一任務被不同客戶端消費,所以需要有鎖機制

3、任務(schedule 單次)執行完畢後,Redis是否會刪除任務佇列中的任務

任務執行完畢後,會刪除tasks、scheduler中的任務,同時counter-1

4、任務佇列的過期策略

所有任務的過期時間都是-1(永不過期),保證任務不會丟

5、任務執行異常情況

能夠正常捕捉到異常,並進行處理,同時任務會從Redis中刪除