1. 程式人生 > 其它 >分散式任務排程-定時任務重複執行解決方案

分散式任務排程-定時任務重複執行解決方案

最近一期需求遇到這麼個問題,需要寫一個定時任務,專案是叢集部署的並且伺服器資源有限沒有redis、Zookeeper等。

我們都知道,當我們服務端在部署叢集模式時,會出現所有的定時任務在各自的節點處都會執行一遍,這顯然是不符合要求的,如何解決這個問題?那就是引入分散式鎖

分散式鎖三種實現方式:1、基於資料庫實現分散式鎖;2、基於快取(Redis等)實現分散式鎖;3、基於Zookeeper實現分散式鎖

基於Redis實現分散式鎖的傳送門:redis分散式鎖

那要怎麼解決呢?沒錯,就是使用第一種方式:基於資料庫實現分散式鎖

本文給出一種springboot整合shedlock的解決方案,以及對shedlock的大致實現原理的原始碼解析

 

1.引入相關jar包

<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-spring</artifactId>
    <version>2.2.1</version>
</dependency>
<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-provider-jdbc-template</artifactId>
    <version>2.2.1</version>
</dependency>

2.資料庫新建表

CREATE TABLE `t_scheduled_lock` (
  `NAME` varchar(64) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '任務名稱',
  `lock_until` timestamp(3) NULL DEFAULT NULL COMMENT '到期時間',
  `locked_at` timestamp(3) NULL DEFAULT NULL COMMENT '開始時間',
  `locked_by` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`NAME`)
) ENGINE
=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分散式任務排程鎖表';

3.配置類

import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;
import net.javacrumbs.shedlock.spring.ScheduledLockConfiguration;
import net.javacrumbs.shedlock.spring.ScheduledLockConfigurationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

import javax.sql.DataSource;
import java.time.Duration;

@Configuration
@EnableScheduling
public class ScheduledConfig {

    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
        //指定表名,會自動在資料庫中記錄任務執行日誌
        return new JdbcTemplateLockProvider(dataSource,"t_scheduled_lock");
    }

    @Bean
    public ScheduledLockConfiguration scheduledLockConfiguration(LockProvider lockProvider) {
        return ScheduledLockConfigurationBuilder
                .withLockProvider(lockProvider)
                .withPoolSize(10)
                .withDefaultLockAtMostFor(Duration.ofMinutes(30))
                .build();
    }

}

4.在啟動類添加註解

//預設持有鎖時間=30分鐘
@EnableSchedulerLock(defaultLockAtMostFor = "PT30M")

5.新增定時任務方法

    @RequestMapping(value ="/text", method = {RequestMethod.GET,RequestMethod.POST})
//    @Scheduled(cron = "-") //表示不執行
    @Scheduled(cron = "0 0 0 * * ?") //表示每天0點執行
    @SchedulerLock(name = "test",lockAtLeastForString = "PT30M")
    public void test() {
        // do something...
    }

@SchedulerLock註解有以下幾個屬性

name:鎖名稱,鎖名稱必須是全域性唯一的;

lockAtMostFor(單位:毫秒):設定鎖的最大持有時間,為了解決如果持有鎖的節點掛了,無法釋放鎖,其他節點無法進行下一次任務,正常情況下任務執行完就會釋放鎖;

lockAtMostForString:鎖的最大持有時間的字串表達,例如“PT30M”表示為30分鐘;

lockAtLeastFor(單位:毫秒):保留鎖的最短時間。這個屬性是鎖的持有時間。設定了多少就一定會持有多長時間,再此期間,下一次任務執行時,其他節點包括它本身是不會執行任務的;

lockAtLeastForString:保留鎖的最短時間的字串表達,例如“PT30M”表示為30分鐘

拿我上面的程式碼解釋一下:鎖test設定了lockAtLeastFor或者lockAtLeastForString屬性的值為30分鐘,就意味這30分鐘內,text()方法不會執行第二遍,30分鐘後才會執行下一次的任務排程。

原始碼解析:

假如這個定時任務開發環境不執行,但是測試環境跟生產環境又執行

可以這麼設定@Scheduled(cron = "-"),可以將cron的值寫在不同環境的配置檔案中 

因為在ScheduledAnnotationBeanPostProcessor.java中的processScheduled()方法提到

 

接下來我們看看@SchedulerLock是如何加鎖的

1.在DefaultLockingTaskExecutor.class中executeWithLock方法寫到,先執行加鎖lock,加鎖成功則執行任務,最後unlock釋放鎖,加鎖不成功則提示It's locked

    public void executeWithLock(Task task, LockConfiguration lockConfig) throws Throwable {
        Optional<SimpleLock> lock = this.lockProvider.lock(lockConfig);
        if (lock.isPresent()) {
            try {
                logger.debug("Locked {}.", lockConfig.getName());
                task.call();
            } finally {
                ((SimpleLock)lock.get()).unlock();
                logger.debug("Unlocked {}.", lockConfig.getName());
            }
        } else {
            logger.debug("Not executing {}. It's locked.", lockConfig.getName());
        }

    }

2.再看看lock()方法是怎麼寫的,StorageBasedLockProvider.class中有提到,意思是先去判斷當前的任務name是否新增過,如果沒有則執行insertRecord做新增操作,新增成功則儲存當前任務的名稱,若新增失敗,則執行updateRecord更新操作,最終返回新增或更新的結果。注意:這裡有可能新增或更新失敗,為什麼?因為我們是叢集部署的,定時任務同一時刻同時執行時,兩個不同的執行緒會同時去執行新增或更新,那麼問題來了,為什麼最終會只有一個執行緒成功呢?繼續看看insertRecord方法或者updateRecord方法內部怎麼寫的了

public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
    boolean lockObtained = this.doLock(lockConfiguration);
    return lockObtained ? Optional.of(new StorageBasedLockProvider.StorageLock(lockConfiguration, this.storageAccessor)) : Optional.empty();
}

protected boolean doLock(LockConfiguration lockConfiguration) {
    String name = lockConfiguration.getName();
    if (!this.lockRecordRegistry.lockRecordRecentlyCreated(name)) {
        if (this.storageAccessor.insertRecord(lockConfiguration)) {
            this.lockRecordRegistry.addLockRecord(name);
            return true;
        }

        this.lockRecordRegistry.addLockRecord(name);
    }

    return this.storageAccessor.updateRecord(lockConfiguration);
}

3.JdbcTemplateStorageAccessor.class

//insertRecord方法其實就是對資料庫插入一條鎖的記錄,包括鎖名稱,鎖到期時間,鎖開始時間以及加鎖的來源,然後將插入的結果返回
public
boolean insertRecord(LockConfiguration lockConfiguration) { String sql = "INSERT INTO " + this.tableName + "(name, lock_until, locked_at, locked_by) VALUES(?, ?, ?, ?)"; return (Boolean)this.transactionTemplate.execute((status) -> { try { int insertedRows = this.jdbcTemplate.update(sql, (preparedStatement) -> { preparedStatement.setString(1, lockConfiguration.getName()); this.setTimestamp(preparedStatement, 2, lockConfiguration.getLockAtMostUntil()); this.setTimestamp(preparedStatement, 3, Instant.now()); preparedStatement.setString(4, this.getHostname()); }); return insertedRows > 0; } catch (DataIntegrityViolationException var5) { return false; } }); }
//updateRecord方法就是更新資料庫的記錄,注意where條件,它會更新(當前鎖名稱+鎖的到期時間<=當前時間)的記錄,並且返回更新結果
//舉個例子:有一條name=test的鎖記錄,lock_until鎖到期時間是中午12點半,當中午12點時定時任務test執行,執行update更新語句事資料庫會返回影響條數0條
//因為test這個鎖還沒到釋放的時間,所以updateRecord方法返回值是true
public boolean updateRecord(LockConfiguration lockConfiguration) { String sql = "UPDATE " + this.tableName + " SET lock_until = ?, locked_at = ?, locked_by = ? WHERE name = ? AND lock_until <= ?"; return (Boolean)this.transactionTemplate.execute((status) -> { int updatedRows = this.jdbcTemplate.update(sql, (statement) -> { Instant now = Instant.now(); this.setTimestamp(statement, 1, lockConfiguration.getLockAtMostUntil());//鎖的到期時間 this.setTimestamp(statement, 2, now); statement.setString(3, this.getHostname()); statement.setString(4, lockConfiguration.getName()); this.setTimestamp(statement, 5, now); }); return updatedRows > 0; }); }

其實說到這裡,還是沒能解答我們上面提到的問題:為什麼最終會只有一個執行緒成功呢?看一下transactionTemplate.execute的原始碼你就懂了

@Override
@Nullable
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
    Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");

    if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
        return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
    }
    else {
        TransactionStatus status = this.transactionManager.getTransaction(this);
        T result;
        try {
            result = action.doInTransaction(status);
        }
        catch (RuntimeException | Error ex) {
            // Transactional code threw application exception -> rollback
            rollbackOnException(status, ex);
            throw ex;
        }
        catch (Throwable ex) {
            // Transactional code threw unexpected exception -> rollback
            rollbackOnException(status, ex);
            throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
        }
        this.transactionManager.commit(status);
        return result;
    }
}

可以看出,當A執行緒先執行新增或者更新資料時,會為這條資料新增事務並且為資料新增排他鎖,其他B、C、D等執行緒會一直等待A執行緒的事務處理完。當A處理完時,其他執行緒才能繼續對這條資料進行加鎖處理,發現加鎖不成功(因為已經有其他執行緒比它先行一步處理了),所以就會提示It's locked(上文有提到)

排他鎖的定義:若事務T對資料物件A加上X鎖,則只允許T讀取和修改A,其他任何事務都不能再對A加任何型別的鎖,直到T釋放A上的鎖。這就保證了其他事務在T釋放A上的鎖之前不能再讀取和修改A

執行緒加鎖完成、任務執行完成之後,下一步就是釋放鎖,看看它是怎麼釋放鎖的

public void unlock(final LockConfiguration lockConfiguration) {
     //更新這條資料鎖的到期時間
final String sql = "UPDATE " + this.tableName + " SET lock_until = ? WHERE name = ?"; this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { protected void doInTransactionWithoutResult(TransactionStatus status) { JdbcTemplateStorageAccessor.this.jdbcTemplate.update(sql, (statement) -> { JdbcTemplateStorageAccessor.this.setTimestamp(statement, 1, lockConfiguration.getUnlockTime());//獲取解鎖時間 statement.setString(2, lockConfiguration.getName()); }); } }); }

//獲取解鎖時間
public
Instant getUnlockTime() {
Instant now = Instant.now();//獲取當前時間
return this.lockAtLeastUntil.isAfter(now) ? this.lockAtLeastUntil : now;
}


lockAtLeastUntil的值就是上面提到的lockAtLeastFor或者lockAtLeastForString屬性經過換算後得到的時間

釋放鎖的邏輯就是:會用當前時間跟保留鎖的最短時間進行比較,如果當前時間小於lockAtLeastUntil,則繼續用lockAtLeastUntil更新資料(相當於沒更新),如果當前時間大於lockAtLeastUntil,則更新為當前時間(也就是從現在開始,這條資料的鎖已經到期了)。

舉個例子:資料庫有一條中午12點加鎖的記錄A,並且lockAtLeastFor或者lockAtLeastForString設定了30分鐘,就意味著A的釋放鎖時間=12點30分,也就是lockAtLeastUntil=12點30分 ,在當前時間12點20分執行緒執行到這裡想要釋放鎖,就會用12點20分跟12點30分進行比較發現結果是小於,則繼續用lockAtLeastUntil更新資料(相當於沒更新)

 

以上就是本人對shedlock實現分散式鎖理解的整個過程,若有理解不對還望指出,我們共同學習!