分散式任務排程-定時任務重複執行解決方案
最近一期需求遇到這麼個問題,需要寫一個定時任務,專案是叢集部署的並且伺服器資源有限沒有redis、Zookeeper等。
我們都知道,當我們服務端在部署叢集模式時,會出現所有的定時任務在各自的節點處都會執行一遍,這顯然是不符合要求的,如何解決這個問題?那就是引入分散式鎖。
分散式鎖三種實現方式:1、基於資料庫實現分散式鎖;2、基於快取(Redis等)實現分散式鎖;3、基於Zookeeper實現分散式鎖
基於Redis實現分散式鎖的傳送門:redis分散式鎖
那要怎麼解決呢?沒錯,就是使用第一種方式:基於資料庫實現分散式鎖
本文給出一種springboot整合shedlock的解決方案,以及對shedlock的大致實現原理的原始碼解析
1.引入相關jar包
<dependency> <groupId>net.javacrumbs.shedlock</groupId> <artifactId>shedlock-spring</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>net.javacrumbs.shedlock</groupId> <artifactId>shedlock-provider-jdbc-template</artifactId> <version>2.2.1</version> </dependency>
2.資料庫新建表
CREATE TABLE `t_scheduled_lock` ( `NAME` varchar(64) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '任務名稱', `lock_until` timestamp(3) NULL DEFAULT NULL COMMENT '到期時間', `locked_at` timestamp(3) NULL DEFAULT NULL COMMENT '開始時間', `locked_by` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, PRIMARY KEY (`NAME`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分散式任務排程鎖表';
3.配置類
import net.javacrumbs.shedlock.core.LockProvider; import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider; import net.javacrumbs.shedlock.spring.ScheduledLockConfiguration; import net.javacrumbs.shedlock.spring.ScheduledLockConfigurationBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import javax.sql.DataSource; import java.time.Duration; @Configuration @EnableScheduling public class ScheduledConfig { @Bean public LockProvider lockProvider(DataSource dataSource) { //指定表名,會自動在資料庫中記錄任務執行日誌 return new JdbcTemplateLockProvider(dataSource,"t_scheduled_lock"); } @Bean public ScheduledLockConfiguration scheduledLockConfiguration(LockProvider lockProvider) { return ScheduledLockConfigurationBuilder .withLockProvider(lockProvider) .withPoolSize(10) .withDefaultLockAtMostFor(Duration.ofMinutes(30)) .build(); } }
4.在啟動類添加註解
//預設持有鎖時間=30分鐘 @EnableSchedulerLock(defaultLockAtMostFor = "PT30M")
5.新增定時任務方法
@RequestMapping(value ="/text", method = {RequestMethod.GET,RequestMethod.POST}) // @Scheduled(cron = "-") //表示不執行 @Scheduled(cron = "0 0 0 * * ?") //表示每天0點執行 @SchedulerLock(name = "test",lockAtLeastForString = "PT30M") public void test() { // do something... }
@SchedulerLock註解有以下幾個屬性
name:鎖名稱,鎖名稱必須是全域性唯一的;
lockAtMostFor(單位:毫秒):設定鎖的最大持有時間,為了解決如果持有鎖的節點掛了,無法釋放鎖,其他節點無法進行下一次任務,正常情況下任務執行完就會釋放鎖;
lockAtMostForString:鎖的最大持有時間的字串表達,例如“PT30M”表示為30分鐘;
lockAtLeastFor(單位:毫秒):保留鎖的最短時間。這個屬性是鎖的持有時間。設定了多少就一定會持有多長時間,再此期間,下一次任務執行時,其他節點包括它本身是不會執行任務的;
lockAtLeastForString:保留鎖的最短時間的字串表達,例如“PT30M”表示為30分鐘
拿我上面的程式碼解釋一下:鎖test設定了lockAtLeastFor或者lockAtLeastForString屬性的值為30分鐘,就意味這30分鐘內,text()方法不會執行第二遍,30分鐘後才會執行下一次的任務排程。
原始碼解析:
假如這個定時任務開發環境不執行,但是測試環境跟生產環境又執行
可以這麼設定@Scheduled(cron = "-"),可以將cron的值寫在不同環境的配置檔案中
因為在ScheduledAnnotationBeanPostProcessor.java中的processScheduled()方法提到
接下來我們看看@SchedulerLock是如何加鎖的
1.在DefaultLockingTaskExecutor.class中executeWithLock方法寫到,先執行加鎖lock,加鎖成功則執行任務,最後unlock釋放鎖,加鎖不成功則提示It's locked
public void executeWithLock(Task task, LockConfiguration lockConfig) throws Throwable { Optional<SimpleLock> lock = this.lockProvider.lock(lockConfig); if (lock.isPresent()) { try { logger.debug("Locked {}.", lockConfig.getName()); task.call(); } finally { ((SimpleLock)lock.get()).unlock(); logger.debug("Unlocked {}.", lockConfig.getName()); } } else { logger.debug("Not executing {}. It's locked.", lockConfig.getName()); } }
2.再看看lock()方法是怎麼寫的,StorageBasedLockProvider.class中有提到,意思是先去判斷當前的任務name是否新增過,如果沒有則執行insertRecord做新增操作,新增成功則儲存當前任務的名稱,若新增失敗,則執行updateRecord更新操作,最終返回新增或更新的結果。注意:這裡有可能新增或更新失敗,為什麼?因為我們是叢集部署的,定時任務同一時刻同時執行時,兩個不同的執行緒會同時去執行新增或更新,那麼問題來了,為什麼最終會只有一個執行緒成功呢?繼續看看insertRecord方法或者updateRecord方法內部怎麼寫的了
public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) { boolean lockObtained = this.doLock(lockConfiguration); return lockObtained ? Optional.of(new StorageBasedLockProvider.StorageLock(lockConfiguration, this.storageAccessor)) : Optional.empty(); } protected boolean doLock(LockConfiguration lockConfiguration) { String name = lockConfiguration.getName(); if (!this.lockRecordRegistry.lockRecordRecentlyCreated(name)) { if (this.storageAccessor.insertRecord(lockConfiguration)) { this.lockRecordRegistry.addLockRecord(name); return true; } this.lockRecordRegistry.addLockRecord(name); } return this.storageAccessor.updateRecord(lockConfiguration); }
3.JdbcTemplateStorageAccessor.class
//insertRecord方法其實就是對資料庫插入一條鎖的記錄,包括鎖名稱,鎖到期時間,鎖開始時間以及加鎖的來源,然後將插入的結果返回
public boolean insertRecord(LockConfiguration lockConfiguration) { String sql = "INSERT INTO " + this.tableName + "(name, lock_until, locked_at, locked_by) VALUES(?, ?, ?, ?)"; return (Boolean)this.transactionTemplate.execute((status) -> { try { int insertedRows = this.jdbcTemplate.update(sql, (preparedStatement) -> { preparedStatement.setString(1, lockConfiguration.getName()); this.setTimestamp(preparedStatement, 2, lockConfiguration.getLockAtMostUntil()); this.setTimestamp(preparedStatement, 3, Instant.now()); preparedStatement.setString(4, this.getHostname()); }); return insertedRows > 0; } catch (DataIntegrityViolationException var5) { return false; } }); }
//updateRecord方法就是更新資料庫的記錄,注意where條件,它會更新(當前鎖名稱+鎖的到期時間<=當前時間)的記錄,並且返回更新結果
//舉個例子:有一條name=test的鎖記錄,lock_until鎖到期時間是中午12點半,當中午12點時定時任務test執行,執行update更新語句事資料庫會返回影響條數0條
//因為test這個鎖還沒到釋放的時間,所以updateRecord方法返回值是true public boolean updateRecord(LockConfiguration lockConfiguration) { String sql = "UPDATE " + this.tableName + " SET lock_until = ?, locked_at = ?, locked_by = ? WHERE name = ? AND lock_until <= ?"; return (Boolean)this.transactionTemplate.execute((status) -> { int updatedRows = this.jdbcTemplate.update(sql, (statement) -> { Instant now = Instant.now(); this.setTimestamp(statement, 1, lockConfiguration.getLockAtMostUntil());//鎖的到期時間 this.setTimestamp(statement, 2, now); statement.setString(3, this.getHostname()); statement.setString(4, lockConfiguration.getName()); this.setTimestamp(statement, 5, now); }); return updatedRows > 0; }); }
其實說到這裡,還是沒能解答我們上面提到的問題:為什麼最終會只有一個執行緒成功呢?看一下transactionTemplate.execute的原始碼你就懂了
@Override @Nullable public <T> T execute(TransactionCallback<T> action) throws TransactionException { Assert.state(this.transactionManager != null, "No PlatformTransactionManager set"); if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) { return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action); } else { TransactionStatus status = this.transactionManager.getTransaction(this); T result; try { result = action.doInTransaction(status); } catch (RuntimeException | Error ex) { // Transactional code threw application exception -> rollback rollbackOnException(status, ex); throw ex; } catch (Throwable ex) { // Transactional code threw unexpected exception -> rollback rollbackOnException(status, ex); throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception"); } this.transactionManager.commit(status); return result; } }
可以看出,當A執行緒先執行新增或者更新資料時,會為這條資料新增事務並且為資料新增排他鎖,其他B、C、D等執行緒會一直等待A執行緒的事務處理完。當A處理完時,其他執行緒才能繼續對這條資料進行加鎖處理,發現加鎖不成功(因為已經有其他執行緒比它先行一步處理了),所以就會提示It's locked(上文有提到)
排他鎖的定義:若事務T對資料物件A加上X鎖,則只允許T讀取和修改A,其他任何事務都不能再對A加任何型別的鎖,直到T釋放A上的鎖。這就保證了其他事務在T釋放A上的鎖之前不能再讀取和修改A
執行緒加鎖完成、任務執行完成之後,下一步就是釋放鎖,看看它是怎麼釋放鎖的
public void unlock(final LockConfiguration lockConfiguration) {
//更新這條資料鎖的到期時間 final String sql = "UPDATE " + this.tableName + " SET lock_until = ? WHERE name = ?"; this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { protected void doInTransactionWithoutResult(TransactionStatus status) { JdbcTemplateStorageAccessor.this.jdbcTemplate.update(sql, (statement) -> { JdbcTemplateStorageAccessor.this.setTimestamp(statement, 1, lockConfiguration.getUnlockTime());//獲取解鎖時間 statement.setString(2, lockConfiguration.getName()); }); } }); }
//獲取解鎖時間
public Instant getUnlockTime() {
Instant now = Instant.now();//獲取當前時間
return this.lockAtLeastUntil.isAfter(now) ? this.lockAtLeastUntil : now;
}
lockAtLeastUntil的值就是上面提到的lockAtLeastFor或者lockAtLeastForString屬性經過換算後得到的時間
釋放鎖的邏輯就是:會用當前時間跟保留鎖的最短時間進行比較,如果當前時間小於lockAtLeastUntil,則繼續用lockAtLeastUntil更新資料(相當於沒更新),如果當前時間大於lockAtLeastUntil,則更新為當前時間(也就是從現在開始,這條資料的鎖已經到期了)。
舉個例子:資料庫有一條中午12點加鎖的記錄A,並且lockAtLeastFor或者lockAtLeastForString設定了30分鐘,就意味著A的釋放鎖時間=12點30分,也就是lockAtLeastUntil=12點30分 ,在當前時間12點20分執行緒執行到這裡想要釋放鎖,就會用12點20分跟12點30分進行比較發現結果是小於,則繼續用lockAtLeastUntil更新資料(相當於沒更新)
以上就是本人對shedlock實現分散式鎖理解的整個過程,若有理解不對還望指出,我們共同學習!