elastic-job+zookeeper實現分散式定時任務排程的使用(springboot版本)
阿新 • • 發佈:2019-02-07
總體思路,要確認一個定時任務需要一個cron表示式+jobDetail;
現在要讓實現定時任務的協調,則就讓zookeeper,簡單說就是需要3要素,zk物件+cron+jobDetail;
總的專案結構
1、maven引入依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
application.properties如下:
server.port=8766 spring.application.name=scheduler-service regCenter.serverList = localhost:2181 regCenter.namespace = elastic-job-lite-springboot stockJob.cron = 0/5 * * * * ? stockJob.shardingTotalCount = 2 stockJob.shardingItemParameters = 0=Chengdu0,1=Chengdu1
其中
stockJob.cron為定時任務的cron表示式;
stockJob.shardingTotalCount為任務的分數量(即同時同時開幾個定時任務);
stockJob.shardingItemParameters為任務分片攜帶的引數;
要素1-zookeeper 建立一個bean,用來配置zk
package com.example.elasticjobdemo.config; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0") public class JobRegistryCenterConfig { @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) { return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); } }
建立一個任務,jobdetail
package com.example.elasticjobdemo.config; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import org.springframework.beans.factory.annotation.Autowired; public class StockSimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " + "當前分片項: %s.當前引數: %s,"+ "當前任務名稱: %s.當前任務引數: %s" , Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); } }
再建立一個類,把3要素連線起來
package com.example.elasticjobdemo.config; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class StockJobConfig { @Autowired private JobRegistryCenterConfig jobRegistryCenterConfig; @Autowired private ZookeeperRegistryCenter regCenter; public StockJobConfig() { } @Bean public SimpleJob stockJob(){ return new StockSimpleJob(); } @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${stockJob.cron}") final String cron, @Value("${stockJob.shardingTotalCount}") final int shardingTotalCount, @Value("${stockJob.shardingItemParameters}") final String shardingItemParameters) { return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters)); } /** *@Description 任務配置類 */ private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters){ return LiteJobConfiguration .newBuilder( new SimpleJobConfiguration( JobCoreConfiguration.newBuilder( jobClass.getName(),cron,shardingTotalCount) .shardingItemParameters(shardingItemParameters) .build() ,jobClass.getCanonicalName() ) ) .overwrite(true) .build(); } }
boot任務啟動後的效果(需要先啟動zk,這裡我填了分片數為2)