1. 程式人生 > >分散式定時器的一些解決方案

分散式定時器的一些解決方案

開發十年,就只剩下這套架構體系了! >>>   

前言

之前寫SpringBoot時,有簡單介紹過分散式定時器的一些思路(SpringBoot | 第二十二章:定時任務的使用)。原來的專案本身使用dubbo實現了一個簡單的實現,目前專案遷移至SpringCloud後,原來的就不適用了,但基本原理都是差不多的,都是集中管理需要呼叫的api及排程等相關資訊。故本篇會簡單介紹下一些常見的分散式定時器的實現方案,還會編寫一個基於http呼叫的統一排程

專案,實現簡單的呼叫SpringCloud專案RESTful介面。

一些說明

本身Spring提供了Spring Task進行定時配置,基於註解和xml配置方式可實現簡單的定時器配置,再一些場景下,若在非單機模式下,部署了多個應用時,若不加以控制,很容易造成資料的錯誤問題。在之前編寫的文章中也有簡單的提及一些分散式解決方案,比如Quartz等,感謝的同學可點選:SpringBoot | 第二十二章:定時任務的使用,進行檢視,這裡就不再重複闡述了。

基於ShedLock實現輕量級分散式定時鎖

ShedLock是一個在分散式環境中使用的定時任務框架,用於解決在分散式環境中的多個例項的相同定時任務在同一時間點重複執行的問題,解決思路是通過對公用的資料庫中的某個表進行記錄和加鎖,使得同一時間點只有第一個執行定時任務併成功在資料庫表中寫入相應記錄的節點能夠成功執行而其他節點直接跳過該任務。簡單來說,ShedLock

本身只做一件事情:保證一個任務最多同時執行一次。所以如官網所說的,ShedLock不是一個分散式排程器,只是一個鎖!

ShedLock

注意:ShedLock支援MongoRedisHazelcastZooKeeper以及任何帶有JDBC驅動程式的東西。本例子為了方便,直接使用了redis進行示例,若本身基於jdbc等,可直接參考官網給出的提示:https://github.com/lukas-krecan/ShedLock#jdbctemplate. 建立對應的表結構。

CREATE TABLE shedlock(
    name VARCHAR(64), 
    lock_until TIMESTAMP(3) NULL, 
    locked_at TIMESTAMP(3) NULL, 
    locked_by  VARCHAR(255), 
    PRIMARY KEY (name)
) 

mark

整合示例

建立工程名:java-shedlock-demo

0.maven依賴(這裡使用當前最新版本及使用redis進行實現),基於SpringBoot 2.0.3.RELEASE版本。

        <dependency>
            <groupId>net.javacrumbs.shedlock</groupId>
            <artifactId>shedlock-spring</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>net.javacrumbs.shedlock</groupId>
            <artifactId>shedlock-provider-redis-spring</artifactId>
            <version>2.3.0</version>
        </dependency>
<dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--spring2.0整合redis所需common-pool2 -->
        <!-- 必須加上,jedis依賴此 -->
        <!-- spring boot 2.0 的操作手冊有標註 大家可以去看看 地址是:https://docs.spring.io/spring-boot/docs/2.0.3.RELEASE/reference/htmlsingle/ -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

1.配置LockProvider,同時開啟@EnableSchedulerLock註解。

ShedLockRedisConfig.java

/** 
*
* @ClassName   類名:ShedLockRedisConfig 
* @Description 功能說明:redis配置
* <p>
* TODO
*</p>
************************************************************************
* @date        建立日期:2019年3月3日
* @author      建立人:oKong
* @version     版本號:V1.0
*<p>
***************************修訂記錄*************************************
* 
*   2019年3月3日   oKong   建立該類功能。
*
***********************************************************************
*</p>
*/
/** 
*
* @ClassName   類名:ShedLockRedisConfig 
* @Description 功能說明:redis配置
* <p>
* TODO
*</p>
************************************************************************
* @date        建立日期:2019年3月3日
* @author      建立人:oKong
* @version     版本號:V1.0
*<p>
***************************修訂記錄*************************************
* 
*   2019年3月3日   oKong   建立該類功能。
*
***********************************************************************
*</p>
*/
@Configuration
//defaultLockAtMostFor 指定在執行節點結束時應保留鎖的預設時間使用ISO8601 Duration格式
//作用就是在被加鎖的節點掛了時,無法釋放鎖,造成其他節點無法進行下一任務
//這裡預設30s
//關於ISO8601 Duration格式用的不到,具體可上網查詢下相關資料,應該就是一套規範,規定一些時間表達方式
@EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
public class ShedLockRedisConfig {
    
    //正常情況下 應該按實際環境來區分的
    //這裡為了方便 寫成test便於是測試
//    @Value("${spring.profiles.active}")
    String env = "test";
    
    @Bean
    public LockProvider lockProvider(RedisConnectionFactory connectionFactory) {
        //環境變數 -需要區分不同環境避免衝突,如dev環境和test環境,兩者都部署時,只有一個例項進行,此時會造成相關環境未啟動情況
        return new RedisLockProvider(connectionFactory, env);
    }
}

2.編寫一個簡單定時任務。

/** 
*
* @ClassName   類名:SimpleTask 
* @Description 功能說明:
* <p>
* TODO
*</p>
************************************************************************
* @date        建立日期:2019年3月3日
* @author      建立人:oKong
* @version     版本號:V1.0
*<p>
***************************修訂記錄*************************************
* 
*   2019年3月3日   oKong   建立該類功能。
*
***********************************************************************
*</p>
*/
@Component
@Slf4j
public class SimpleTask {
    
    //區分服務
    @Value("${server.port}")
    String port;
    
    //為了方便測試 設定cron表示式 
    @Scheduled(cron = "*/5 * * * * ?")
    //lockAtLeastFor:保證在設定的期間類不執行多次任務,單位是毫秒,此處可以根據實際任務執行情況進行設定,
    //簡單來說,一個每15分鐘執行的任務,若每次任務執行的時間為幾分鐘,則可以設定lockAtLeastFor大於其最大估計最大執行時間
    //避免一次任務未執行完,下一個定時任務又啟動了。
    //任務執行完,會自動釋放鎖。
    @SchedulerLock(name="simpleTask",lockAtLeastFor = 1*1000)
    public void getCurrentDate() {
        log.info("埠({}),Scheduled定時任務執行:{}", port, new Date());
    }
}

3.編寫啟動類開啟定時任務功能,及配置檔案。

/**
 *
 * @ClassName 類名:ShedLockApplication
 * @Description 功能說明:啟動類
 *              <p>
 *              TODO
 *              </p>
 ************************************************************************
 * @date 建立日期:2019年3月3日
 * @author 建立人:oKong
 * @version 版本號:V1.0
 *          <p>
 ***************************          修訂記錄*************************************
 * 
 *          2019年3月3日 oKong 建立該類功能。
 *
 ***********************************************************************
 *          </p>
 */
@SpringBootApplication
@EnableScheduling // 開啟定時任務
@Slf4j
public class ShedLockApplication {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(ShedLockApplication.class, args);
        log.info("java-shedlock-demo啟動!");
    }

}

application.properties

server.port=8001

# REDIS (RedisProperties)
# Redis資料庫索引(預設為0)
spring.redis.database=0
# Redis伺服器地址
spring.redis.host=127.0.0.1
# Redis伺服器連線埠
spring.redis.port=6379  
# Redis伺服器連線密碼(預設為空)
#spring.redis.password=
# 連線池最大連線數(使用負值表示沒有限制)
spring.redis.lettuce.pool.max-active=8  
# 連線池最大阻塞等待時間(使用負值表示沒有限制)Duration
spring.redis.lettuce.pool.max-wait=-1ms 
# 連線池中的最大空閒連線
spring.redis.lettuce.pool.max-idle=8  
# 連線池中的最小空閒連線
spring.redis.lettuce.pool.min-idle=0  
# 連線超時時間-Duration 不能設定為0 一般上設定個200ms
spring.redis.timeout=200ms

4.利用多環境啟動多個服務(8001,8002),檢視是否正常執行。 8001服務

2019-03-03 23:36:30.070  INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask    : 埠(8001),Scheduled定時任務執行:Mon Mar 03 23:36:30 CST 2019
2019-03-03 23:36:35.005  INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask    : 埠(8001),Scheduled定時任務執行:Mon Mar 03 23:36:35 CST 2019
2019-03-03 23:36:40.002  INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask    : 埠(8001),Scheduled定時任務執行:Mon Mar 03 23:36:40 CST 2019
2019-03-03 23:36:45.003  INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask    : 埠(8001),Scheduled定時任務執行:Mon Mar 03 23:36:45 CST 2019
2019-03-03 23:36:50.003  INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask    : 埠(8001),Scheduled定時任務執行:Mon Mar 03 23:36:50 CST 2019
2019-03-03 23:36:55.006  INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask    : 埠(8001),Scheduled定時任務執行:Mon Mar 03 23:36:55 CST 2019
2019-03-03 23:37:05.002  INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask    : 埠(8001),Scheduled定時任務執行:Mon Mar 03 23:37:05 CST 2019
2019-03-03 23:37:15.002  INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask    : 埠(8001),Scheduled定時任務執行:Mon Mar 03 23:37:15 CST 2019

8002服務

2019-03-03 23:37:00.012  INFO 24492 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask    : 埠(8002),Scheduled定時任務執行:Mon Mar 03 23:37:00 CST 2019
2019-03-03 23:37:10.007  INFO 24492 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask    : 埠(8002),Scheduled定時任務執行:Mon Mar 03 23:37:10 CST 2019

通過日誌輸出,可以看出每次任務執行時,只有一個例項在執行。具體哪個服務,看誰獲取到鎖了。

SchedulerLock註解說明

@SchedulerLock註解一共支援五個引數,分別是

  • name:用來標註一個定時服務的名字,被用於寫入資料庫作為區分不同服務的標識,如果有多個同名定時任務則同一時間點只有一個執行成功
  • lockAtMostFor:成功執行任務的節點所能擁有獨佔鎖的最長時間,單位是毫秒ms
  • lockAtMostForString:成功執行任務的節點所能擁有的獨佔鎖的最長時間的字串表達,例如“PT14M”表示為14分鐘
  • lockAtLeastFor:成功執行任務的節點所能擁有獨佔所的最短時間,單位是毫秒ms
  • lockAtLeastForString:成功執行任務的節點所能擁有的獨佔鎖的最短時間的字串表達,例如“PT14M”表示為14分鐘

兩種整合模式

按官網介紹,其有兩種模式:TaskSchedulerMethod代理,具體的可以檢視官網介紹,這裡就不過多闡述了。簡單來說,都是使用AOP代理機制,一個是代理了taskScheduler,一個是代理了被註解了SchedulerLock具體的方法。可以具體場景進行設定,比如記錄定時任務日誌等。這裡需要注意,使用Method代理時,其不依賴於Spring環境,但普通呼叫此方法時也會進行鎖定的,需要注意,而且目前只支援void的方法。

Method

TaskScheduler代理時序圖

TaskScheduler proxy

Method代理時序圖

Method

基於統一排程中心實現任務呼叫

統一排程中心:一個管理定時任務配置及發起任務執行的一個服務。簡單來說,就是通過維護需要執行任務的服務列表,如api地址dubbo服務資訊等,通過配置的定時配置進行服務呼叫。從而避免了定時任務重複問題,同時也能利用註冊中心實現負載均衡動態呼叫對應任務。

技術選型

  1. 核心框架:SpringBoot 2.0.3.RELEASESpringcloud Finchley.SR1
  2. 任務排程:Quartz
  3. 持久層框架:MyBatis + MyBatis-Plus
  4. 資料庫:mysql

題外話:原本想延續原先SpringBoot1.5版本進行開發,後面考慮此服務相對簡單,所以直接嘗試使用webflux進行服務開發,順便也學習學習WebFlux相關操作。

資料庫指令碼

CREATE TABLE `sched_config` (
  `id` bigint(20) NOT NULL,
  `name` varchar(200) DEFAULT NULL COMMENT '任務名稱',
  `target_service_type` varchar(2) DEFAULT NULL COMMENT '目標任務型別:01 springcloud 02 http 03 dubbo',
  `targer_service` varchar(50) DEFAULT NULL COMMENT '目標服務:可為服務地址,或者dubbo服務名',
  `cron_config` varchar(20) DEFAULT NULL COMMENT 'cron表示式',
  `status` varchar(1) DEFAULT NULL COMMENT '狀態:1啟用 0 停用',
  `remark` varchar(200) DEFAULT NULL COMMENT '備註說明',
  `extra_dubbo_group` varchar(50) DEFAULT NULL COMMENT 'dubbo組名',
  `extra_dubbo_version` varchar(50) DEFAULT NULL COMMENT 'dubbo服務版本資訊',
  `gmt_create` datetime DEFAULT NULL COMMENT '建立時間',
  `gmt_modified` datetime DEFAULT NULL COMMENT '修改時間',
  PRIMARY KEY (`id`)
)

相關類說明

quartz工廠類(QuartzJobFactory)

為了使得自定義的job能主動注入spring的相關bean,需要額外實現此工廠類,方便呼叫。當然也可以直接動態獲取bean例項了。

public class QuartzJobFactory extends AdaptableJobFactory {
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        // 呼叫父類的方法
        Object jobInstance = super.createJobInstance(bundle);
        //主動注入
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

同時,配置SchedulerFactoryBean,設定其工廠類。 QuartzConfig.java

@Configuration
public class QuartzConfig {
    
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(){
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setOverwriteExistingJobs(true);
        // 延時啟動
        factory.setStartupDelay(20);
        // 自定義Job Factory,用於Spring注入
        factory.setJobFactory(quartzJobFactory());
        return factory;
    }
    
    @Bean
    public QuartzJobFactory quartzJobFactory() {
        return new QuartzJobFactory();
    }
}

初始化任務(InitJob)

在服務啟動時,啟動開啟配置的任務,同時設定其定時器。

@Component
@Slf4j
public class InitJob {

    @Autowired
    ISchedConfigService schedConfigService;
    
    @Autowired
    Scheduler scheduler;
    
    /**
     * 
     * <p>函式名稱:  initJob      </p>
     * <p>功能說明: 啟動時,進行任務初始化操作,即啟動相應的任務定時器
     *
     * </p>
     *<p>引數說明:</p>
     *
     * @date   建立時間:2019年3月4日
     * @author 作者:oKong
     */
    @PostConstruct
    public void initJob() {
        log.info("初始化任務開始......");
        //獲取所有啟用任務
        EntityWrapper<SchedConfig> qryWrapper = new EntityWrapper<>();
        qryWrapper.eq(SchedConfig.STATUS, "1");
        List<SchedConfig> schedConfigList = schedConfigService.selectList(qryWrapper);
        if(schedConfigList == null || schedConfigList.isEmpty()) {
            log.warn("暫無定時任務");
            return;
        }
        for(SchedConfig config : schedConfigList) {
            String name = config.getName();//任務名稱
            JobDetail jobDetail = newJob(TaskJob.class).withIdentity(name, "okongJobGroup").build();
            //設定執行時引數
            JobDataMap jobDataMap = jobDetail.getJobDataMap();
            jobDataMap.put("config", config);
            //建立trigger觸發器
            Trigger trigger = newTrigger()
                    .withIdentity(name, "okongTriggerGroup")
                    .withSchedule(cronSchedule(config.getCronConfig())).build();
            
            //啟動定時器
            try {
                scheduler.scheduleJob(jobDetail, trigger);
                log.info("任務[{}]啟動成功", name);
            } catch (SchedulerException e) {
                log.error("任務[{}]啟動失敗,{}", name,e.getMessage());
            }
        }
        log.info("初始化任務結束......");
    }
}

任務類(TaskJob)

實現具體任務的執行和呼叫。利用WebClient實現http服務的呼叫。暫時未實現dubbo的呼叫,後期再補充。

  1. 配置普通WebClient和具有負載均衡的webClient,主要是考慮到存在訪問SpringCloud服務和普通http的需求,原先使用負載均衡的restTemplate時,訪問普通的http請求是無法訪問的,不知道webClient是否也是一樣,這裡直接簡單粗暴的直接設定了兩個webClient
@Configuration
public class WebClientConfig {

    /**
     * 
     * <p>函式名稱:  loadBalancedWebClientBuilder     </p>
     * <p>功能說明:  具有負載均衡的WebClient
     *
     * </p>
     *<p>引數說明:</p>
     * @return
     *
     * @date   建立時間:2019年3月5日
     * @author 作者:oKong
     */
    @Bean("balanceWebClient")
    @LoadBalanced
    public WebClient.Builder loadBalancedWebClientBuilder() {
        return WebClient.builder();
    }
    
    /**
     * 
     * <p>函式名稱: webClientBuilder       </p>
     * <p>功能說明:普通WebClient
     *
     * </p>
     *<p>引數說明:</p>
     * @return
     *
     * @date   建立時間:2019年3月5日
     * @author 作者:oKong
     */
    @Bean("webClient")
    public WebClient.Builder webClientBuilder() {
        return WebClient.builder();
    }
}
  1. 具體執行任務類,根據不同的型別,進行不同的呼叫。
//@DisallowConcurrentExecution 說明在一個任務執行時,另一個定時點來臨時不會執行任務,比如一個定時是間隔3分鐘一次,但任務執行了5分鐘,此時會等上個任務完成後再執行下一次定時任務
@DisallowConcurrentExecution
@Slf4j
public class TaskJob implements org.quartz.Job{
    
    /**
     * spring5中 非同步restTemplate已被標記位作廢了
     * 這裡嘗試使用webClient
     */ 
    @Autowired
    @Qualifier("balanceWebClient")
    private WebClient.Builder balanceWebClientBuilder;
    
    @Autowired
    @Qualifier("webClient")
    private WebClient.Builder webClientBuilder;
    
    
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        //執行方法
        //獲取任務實體物件
        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
        SchedConfig schedConfig = (SchedConfig) jobDataMap.get("config");
        log.info("執行定時任務:{}", schedConfig);
        //根據不同型別進行不同的處理邏輯
        Mono<String> monoRst = null;
        switch (schedConfig.getTargetServiceType()) {
        case "01":
            //springcloud方式
            //利用loadBalancerClient 獲取實際服務地址
            monoRst = balanceWebClientBuilder.build().post().uri(schedConfig.getTargerService()).retrieve().bodyToMono(String.class);
             break;
        case "02":
            //普通http方式
            monoRst =webClientBuilder.build().post().uri(schedConfig.getTargerService()).retrieve().bodyToMono(String.class);//無引數
            break;
        case "03":
            //dubbo方式
            //TODO 暫時未實現
            break;
        default:

        }
        if(monoRst != null) {
          log.info("呼叫服務結果為:{}", monoRst.block());
        }
    }

}

服務效果

為了測試,簡單改造了java-shedlock-demoSpringCloud專案,具體就不貼程式碼了,可直接下載相應工程進行檢視。

資料庫配置: 資料庫配置

服務啟動,控制檯輸出: 對比結果

大家可自行測試下,這裡只是簡單的進行控制檯輸出。

參考資料

總結

本文主要簡單介紹了一些分散式定時任務的解決方案。對於ShedLock大部分的分散式場景應該是夠用了,特別場景下可能需要注意,實際情況實際解決了。而對於後一種,統一排程服務而言,本身只是個簡單的示例,後續會考慮加入dubbo的支援,及一些其他的特性,如呼叫反饋,失敗次數、動態新增任務、修改定時任務,移除定時任務等等,目前只是簡單的為了滿足業務需要,後需要會進行優化的,目前就且看吧,一些異常之類的都還沒有進行處理⊙﹏⊙‖∣。

完整示例: 統一排程中心:okong-scheduler schedLock-demo: https://github.com/xie19900123/java-learning/tree/master/java-shedlock-demo

原文地址:https://blog.lqdev.cn/2019/03/06/%E6%97%A5%E5%B8%B8%E7%A7%AF%E7%B4%A