1. 程式人生 > >springboot整合elasticJob實戰(純代碼開發三種任務類型用法)以及分片系統詳解

springboot整合elasticJob實戰(純代碼開發三種任務類型用法)以及分片系統詳解

oid frame ima 時間設置 curator onclick 支持 pen 博客搭建

一 springboot整合

介紹就不多說了,只有這個框架是當當網開源的,支持分布式調度,分布式系統中非常合適(兩個服務同時跑不會重復,並且可靈活配置分開分批處理數據,賊方便)!

這裏主要還是用到zookeeper,如果沒有zk環境,可以百度或者參考我之前的博客搭建

添加依賴,這裏有一點,如果是在springcloud中的話,需要排除自帶的curator依賴,因為cloud已經集成一些,會沖突:

技術分享圖片
 1  <!-- elastic-job -->
 2         <dependency>
 3             <groupId>com.dangdang</groupId>
 4
<artifactId>elastic-job-lite-core</artifactId> 5 <version>2.1.5</version> 6 <exclusions> 7 <exclusion> 8 <artifactId>curator-client</artifactId> 9 <groupId>org.apache.curator</groupId> 10
</exclusion> 11 <exclusion> 12 <artifactId>curator-framework</artifactId> 13 <groupId>org.apache.curator</groupId> 14 </exclusion> 15 <exclusion> 16
<artifactId>curator-recipes</artifactId> 17 <groupId>org.apache.curator</groupId> 18 </exclusion> 19 </exclusions> 20 </dependency> 21 <dependency> 22 <groupId>com.dangdang</groupId> 23 <artifactId>elastic-job-lite-spring</artifactId> 24 <version>2.1.5</version> 25 </dependency> 26 <dependency> 27 <groupId>org.apache.curator</groupId> 28 <artifactId>curator-framework</artifactId> 29 <version>2.10.0</version> 30 </dependency> 31 <dependency> 32 <groupId>org.apache.curator</groupId> 33 <artifactId>curator-client</artifactId> 34 <version>2.10.0</version> 35 </dependency> 36 <dependency> 37 <groupId>org.apache.curator</groupId> 38 <artifactId>curator-recipes</artifactId> 39 <version>2.10.0</version> 40 </dependency> 41 </dependencies>
View Code

然後就是配置zk註冊中心,分布式功能主要依賴這個,所有屬性都從yml中註入,這裏註意一點,可以把超時時間設置大一點:

@Configuration
public class ElasticRegCenterConfig {
    /**
     * 配置zookeeper註冊中心
     */
    @Bean(initMethod = "init")  // 需要配置init執行初始化邏輯
    public ZookeeperRegistryCenter regCenter(
            @Value("${regCenter.serverList}") final String serverList,
            @Value("${regCenter.namespace}") final String namespace) {
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
        zookeeperConfiguration.setMaxRetries(3); //設置重試次數,可設置其他屬性
        zookeeperConfiguration.setSessionTimeoutMilliseconds(500000); //設置會話超時時間,盡量大一點,否則項目無法正常啟動
        return new ZookeeperRegistryCenter(zookeeperConfiguration);
    }
}

然後就是配置job了,其實和spring的quartz配置都差不多,一個job類,一個調度類

這裏先貼我的yml配置,任務執行周期,分片個數都從這裏註入即可,分片使用後面單獨說明:

技術分享圖片

二 simplejob

job類:

@Component
public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(shardingContext.getJobName()+"執行:"+
                "分片參數:"+shardingContext.getShardingParameter()+
                ",當前分片項:"+shardingContext.getShardingItem()+
                ",time:"+ LocalDate.now());
    }
}

 

配置類,這裏用到了一個工具方法,工具類放下面:

/**
 * 配置MySimpleJob
 */
@Configuration
public class MySimpleJobConf {
    @Autowired ZookeeperRegistryCenter regCenter;
    @Autowired MySimpleJob mySimpleJob;
    /**
     * 配置任務調度: 參數:  任務
     *                    zk註冊中心
     *                    任務詳情
     */
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(@Value("${mySimpleJob.cron}") final String cron,  //yml註入
                                           @Value("${mySimpleJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${mySimpleJob.shardingItemParameters}") final String shardingItemParameters) {
        return new SpringJobScheduler(mySimpleJob, regCenter,
                                      ElasticJobUtils.getSimpleJobConfiguration(
                                              mySimpleJob.getClass(),
                                              cron,
                                              shardingTotalCount,
                                              shardingItemParameters)
                                              //,new MyElasticJobListener() 可配置監聽器
        );
    }
}

工具類:

技術分享圖片
public class ElasticJobUtils {

    /**
     * 創建簡單任務詳細信息
     */
    public static LiteJobConfiguration getSimpleJobConfiguration(final Class<? extends SimpleJob> jobClass, //任務類
                                                                final String cron,    // 運行周期配置
                                                                final int shardingTotalCount,  //分片個數
                                                                final String shardingItemParameters) {  // 分片參數
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters).build()
                , jobClass.getCanonicalName())
        ).overwrite(true).build();
    }

    /**
     * 創建流式作業配置
     */
    public static LiteJobConfiguration getDataFlowJobConfiguration(final Class<? extends DataflowJob> jobClass, //任務類
                                                                   final String cron,    // 運行周期配置
                                                                   final int shardingTotalCount,  //分片個數
                                                                   final String shardingItemParameters,
                                                                   final Boolean streamingProcess   //是否是流式作業
                                                                   ) {  // 分片參數
        return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(
                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters).build()
                // true為流式作業,除非fetchData返回數據為null或者size為0,否則會一直執行
                // false 非流式,只會按配置時間執行一次
                , jobClass.getCanonicalName(),streamingProcess)
        ).overwrite(true).build();
    }
}
View Code

測試:

技術分享圖片

三 dataflowjob

job類:

@Component
public class MyDataFlowJob implements DataflowJob<String> {
    @Override
    public List<String> fetchData(ShardingContext shardingContext) { //抓取數據
        System.out.println("---------獲取數據---------");
        return Arrays.asList("1","2","3");
    }
    @Override
    public void processData(ShardingContext shardingContext, List<String> list) {//處理數據
        System.out.println("---------處理數據---------");
        list.forEach(x-> System.out.println("數據處理:"+x));
    }
}

配置類:

@Configuration
public class MyDataFlowJobConf {
    @Autowired ZookeeperRegistryCenter regCenter;
    @Autowired MyDataFlowJob myDataFlowJob;
    /**
     * 配置任務調度: 參數:  任務
     *                    zk註冊中心
     *                    任務詳情
     */
    @Bean(initMethod = "init")
    public JobScheduler dataFlowJobScheduler(@Value("${myDataFlowJob.cron}") final String cron,  //yml註入
                                           @Value("${myDataFlowJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${myDataFlowJob.shardingItemParameters}") final String shardingItemParameters) {
        return new SpringJobScheduler(myDataFlowJob, regCenter,
                ElasticJobUtils.getDataFlowJobConfiguration(
                        myDataFlowJob.getClass(),
                        cron,
                        shardingTotalCount,
                        shardingItemParameters,true)
                //,new MyElasticJobListener() 可配置監聽器
        );
    }
}

測試:

技術分享圖片

需要註意一點流式作業如果數據不為空會一直跑

四 scriptjob

五 分片用法

分片的目的就是通過配置分片個數,讓不同的分片參數到不同的服務中去,比如配置了分片個數是2,那麽分片一會到服務一種,分片二到服務二中

項目匯中科根據分片參數來決定哪個服務處理哪些數據,比如 0=客戶甲乙,1=客戶乙,但是分片item是從1開始

分片算法默認是平均,可自定義,然後參數就是上面yml那種配置,比如2,就是 0=,1= 4就是0=,1=,2=,3=,兩個服務的話服務一就是0,1的參數,服務二就是2,3的參數,並且分片item是3,4

然後要註意一點的是,這個分片識別是根據ip的,也就是說同一臺電腦,跑兩個程序沒用,兩個程序都會全部執行,還是會重復

主要是這個分片保證分布式中處理數據不重復,分片也會轉移,即一個服務掛了之後,分片參數和item會自動轉移到剩下服務中

六 監聽器用法

六 容易踩的坑

一 配置類中配置bean的時候,方法名不要重復,否則會發現任務不跑,

二 測試分布式的時候,必須跑在ip不一樣的服務上,否則不會實現分片

三 我的版本再pom裏面,springboot版本是2.0.6,版本不一樣可能用法也有些區別

四 理論上xml更簡單,但是我個人比較喜歡代碼風格,哈哈

springboot整合elasticJob實戰(純代碼開發三種任務類型用法)以及分片系統詳解