springboot整合elasticJob實戰(純代碼開發三種任務類型用法)以及分片系統詳解
一 springboot整合
介紹就不多說了,只有這個框架是當當網開源的,支持分布式調度,分布式系統中非常合適(兩個服務同時跑不會重復,並且可靈活配置分開分批處理數據,賊方便)!
這裏主要還是用到zookeeper,如果沒有zk環境,可以百度或者參考我之前的博客搭建
添加依賴,這裏有一點,如果是在springcloud中的話,需要排除自帶的curator依賴,因為cloud已經集成一些,會沖突:
1 <!-- elastic-job --> 2 <dependency> 3 <groupId>com.dangdang</groupId> 4View Code<artifactId>elastic-job-lite-core</artifactId> 5 <version>2.1.5</version> 6 <exclusions> 7 <exclusion> 8 <artifactId>curator-client</artifactId> 9 <groupId>org.apache.curator</groupId> 10</exclusion> 11 <exclusion> 12 <artifactId>curator-framework</artifactId> 13 <groupId>org.apache.curator</groupId> 14 </exclusion> 15 <exclusion> 16<artifactId>curator-recipes</artifactId> 17 <groupId>org.apache.curator</groupId> 18 </exclusion> 19 </exclusions> 20 </dependency> 21 <dependency> 22 <groupId>com.dangdang</groupId> 23 <artifactId>elastic-job-lite-spring</artifactId> 24 <version>2.1.5</version> 25 </dependency> 26 <dependency> 27 <groupId>org.apache.curator</groupId> 28 <artifactId>curator-framework</artifactId> 29 <version>2.10.0</version> 30 </dependency> 31 <dependency> 32 <groupId>org.apache.curator</groupId> 33 <artifactId>curator-client</artifactId> 34 <version>2.10.0</version> 35 </dependency> 36 <dependency> 37 <groupId>org.apache.curator</groupId> 38 <artifactId>curator-recipes</artifactId> 39 <version>2.10.0</version> 40 </dependency> 41 </dependencies>
然後就是配置zk註冊中心,分布式功能主要依賴這個,所有屬性都從yml中註入,這裏註意一點,可以把超時時間設置大一點:
@Configuration
public class ElasticRegCenterConfig {
/**
* 配置zookeeper註冊中心
*/
@Bean(initMethod = "init") // 需要配置init執行初始化邏輯
public ZookeeperRegistryCenter regCenter(
@Value("${regCenter.serverList}") final String serverList,
@Value("${regCenter.namespace}") final String namespace) {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
zookeeperConfiguration.setMaxRetries(3); //設置重試次數,可設置其他屬性
zookeeperConfiguration.setSessionTimeoutMilliseconds(500000); //設置會話超時時間,盡量大一點,否則項目無法正常啟動
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
}
然後就是配置job了,其實和spring的quartz配置都差不多,一個job類,一個調度類
這裏先貼我的yml配置,任務執行周期,分片個數都從這裏註入即可,分片使用後面單獨說明:
二 simplejob
job類:
@Component
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(shardingContext.getJobName()+"執行:"+
"分片參數:"+shardingContext.getShardingParameter()+
",當前分片項:"+shardingContext.getShardingItem()+
",time:"+ LocalDate.now());
}
}
配置類,這裏用到了一個工具方法,工具類放下面:
/**
* 配置MySimpleJob
*/
@Configuration
public class MySimpleJobConf {
@Autowired ZookeeperRegistryCenter regCenter;
@Autowired MySimpleJob mySimpleJob;
/**
* 配置任務調度: 參數: 任務
* zk註冊中心
* 任務詳情
*/
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(@Value("${mySimpleJob.cron}") final String cron, //yml註入
@Value("${mySimpleJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${mySimpleJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(mySimpleJob, regCenter,
ElasticJobUtils.getSimpleJobConfiguration(
mySimpleJob.getClass(),
cron,
shardingTotalCount,
shardingItemParameters)
//,new MyElasticJobListener() 可配置監聽器
);
}
}
工具類:
public class ElasticJobUtils {
/**
* 創建簡單任務詳細信息
*/
public static LiteJobConfiguration getSimpleJobConfiguration(final Class<? extends SimpleJob> jobClass, //任務類
final String cron, // 運行周期配置
final int shardingTotalCount, //分片個數
final String shardingItemParameters) { // 分片參數
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters).build()
, jobClass.getCanonicalName())
).overwrite(true).build();
}
/**
* 創建流式作業配置
*/
public static LiteJobConfiguration getDataFlowJobConfiguration(final Class<? extends DataflowJob> jobClass, //任務類
final String cron, // 運行周期配置
final int shardingTotalCount, //分片個數
final String shardingItemParameters,
final Boolean streamingProcess //是否是流式作業
) { // 分片參數
return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(
JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters).build()
// true為流式作業,除非fetchData返回數據為null或者size為0,否則會一直執行
// false 非流式,只會按配置時間執行一次
, jobClass.getCanonicalName(),streamingProcess)
).overwrite(true).build();
}
}
View Code
測試:
三 dataflowjob
job類:
@Component
public class MyDataFlowJob implements DataflowJob<String> {
@Override
public List<String> fetchData(ShardingContext shardingContext) { //抓取數據
System.out.println("---------獲取數據---------");
return Arrays.asList("1","2","3");
}
@Override
public void processData(ShardingContext shardingContext, List<String> list) {//處理數據
System.out.println("---------處理數據---------");
list.forEach(x-> System.out.println("數據處理:"+x));
}
}
配置類:
@Configuration
public class MyDataFlowJobConf {
@Autowired ZookeeperRegistryCenter regCenter;
@Autowired MyDataFlowJob myDataFlowJob;
/**
* 配置任務調度: 參數: 任務
* zk註冊中心
* 任務詳情
*/
@Bean(initMethod = "init")
public JobScheduler dataFlowJobScheduler(@Value("${myDataFlowJob.cron}") final String cron, //yml註入
@Value("${myDataFlowJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${myDataFlowJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(myDataFlowJob, regCenter,
ElasticJobUtils.getDataFlowJobConfiguration(
myDataFlowJob.getClass(),
cron,
shardingTotalCount,
shardingItemParameters,true)
//,new MyElasticJobListener() 可配置監聽器
);
}
}
測試:
需要註意一點流式作業如果數據不為空會一直跑
四 scriptjob
五 分片用法
分片的目的就是通過配置分片個數,讓不同的分片參數到不同的服務中去,比如配置了分片個數是2,那麽分片一會到服務一種,分片二到服務二中
項目匯中科根據分片參數來決定哪個服務處理哪些數據,比如 0=客戶甲乙,1=客戶乙,但是分片item是從1開始
分片算法默認是平均,可自定義,然後參數就是上面yml那種配置,比如2,就是 0=,1= 4就是0=,1=,2=,3=,兩個服務的話服務一就是0,1的參數,服務二就是2,3的參數,並且分片item是3,4
然後要註意一點的是,這個分片識別是根據ip的,也就是說同一臺電腦,跑兩個程序沒用,兩個程序都會全部執行,還是會重復
主要是這個分片保證分布式中處理數據不重復,分片也會轉移,即一個服務掛了之後,分片參數和item會自動轉移到剩下服務中
六 監聽器用法
六 容易踩的坑
一 配置類中配置bean的時候,方法名不要重復,否則會發現任務不跑,
二 測試分布式的時候,必須跑在ip不一樣的服務上,否則不會實現分片
三 我的版本再pom裏面,springboot版本是2.0.6,版本不一樣可能用法也有些區別
四 理論上xml更簡單,但是我個人比較喜歡代碼風格,哈哈
springboot整合elasticJob實戰(純代碼開發三種任務類型用法)以及分片系統詳解