1. 程式人生 > >Elastic-Job之簡單Job

Elastic-Job之簡單Job

簡介

elastic-job是噹噹網開源的基於zookeeper和quartz實現的分散式作業排程框架。github地址是https://github.com/dangdangdotcom/elastic-job,官方網站是http://elasticjob.io/。elastic-job分elastic-job-lite和elastic-job-cloud,elastic-job-lite定位為輕量級的無中心化解決方案,本文要介紹的用法也是基於elastic-job-lite的。官方的文件其實挺齊全的,本文旨在對elastic-job的應用做一個簡單的介紹,也算是完善自己的知識庫,詳細的資訊請參考官方網站

核心概念

  • job:即要執行的任務
  • 分片:即把任務拆分成多個片段,分別排程。這些片段可以落在不同的節點上,所以在實現任務排程的介面時需要根據當前的片段來進行操作,否則就失去了意義。片段數是從0開始的,比如總的分片數是6,一共有兩臺機器,則第一臺機器上分配的片段數將是0,1,2,而第二臺機器上分配的片段數將是3,4,5。
  • 重新分片:當有新的機器加入或者有機器宕機的時候都將觸發重新分片。因為分片本來就是把總的片段數平均分配給不同的節點,節點數變了,每臺機器能夠分配的片段必將發生變化。

簡單任務

簡單任務對應於com.dangdang.ddframe.job.api.simple.SimpleJob介面,該介面的定義如下:

public interface SimpleJob extends ElasticJob {

    /**
     * 執行作業.
     *
     * @param shardingContext 分片上下文
     */
    void execute(ShardingContext shardingContext);
}

該介面只定義了一個方法,用於執行需要的任務,你可以把你的定時作業需要執行的邏輯在此方法中實現。elastic-job定時排程時就會排程該execute方法。該方法只接收一個ShardingContext型別的引數。該引數中包含了任務排程一些比較核心的資訊,比如分片總數、當前的分片等。任務的實現需要根據當前的片段數來進行,否則可能達不到你的預期效果。以下是一個簡單的示例。

/**
 * 普通作業,與Quartz的定時作業類似,只是會多了分片等功能
 * @author Elim
 * 2016年10月29日
 */
public class MyElasticJob implements SimpleJob {

    private static final Logger LOGGER = Logger.getLogger(MyElasticJob.class);

    @Override
    public void execute(ShardingContext context) {
        //當你的作業是分片的時候,你需要在你的Job的execute方法中根據當前的分片shardingItem的不同取值實現不同的邏輯,
        //要把所有的shardingItem都覆蓋到,因為在分散式環境,每臺機器都不能確保它當前的分片是哪一個,並且我們需要保持程式
        //的一致性,程式編寫好了對部署是不會有影響的。
        int shardingItem = context.getShardingItem();
        switch (shardingItem) {
            case 0:
                LOGGER.info("處理第一個分片");
                break;
            case 1: 
                LOGGER.info("處理第二個分片");
                break;
            case 2:
                LOGGER.info("處理第三個分片");
                break;
            case 3:
                LOGGER.info("處理第四個分片");
                break;
            case 4:
                LOGGER.info("處理第五個分片");
                break;
            case 5:
                LOGGER.info("處理第六個分片");
                break;
        }
        LOGGER.info(context);
    }

}

配置任務

定義好了作業任務的實現類後為了使作業任務生效,我們需要對其進行配置。配置有兩種方式,基於API的配置和基於Spring名稱空間的配置。以下介紹的都是基於Spring名稱空間的配置。

引入名稱空間

首先需要引入reg和job名稱空間,示例如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
    xmlns:job="http://www.dangdang.com/schema/ddframe/job"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
</beans>

配置註冊中心

reg用於配置作用註冊中心,即配置zookeeper。

<reg:zookeeper id="regCenter" server-lists="localhost:2181"
    namespace="dd-job" base-sleep-time-milliseconds="1000"
    max-sleep-time-milliseconds="3000" max-retries="3" />
  • id用於給該註冊中心命名。
  • server-lists用於指定使用的zookeeper的地址,多個地址之間用英文的逗號分隔。
  • namespace用於指定註冊中心在zookeeper中的名稱空間,屬於zookeeper的概念。
  • base-sleep-time-milliseconds用於指定等待重試的間隔時間的初始值,單位是毫秒。
  • max-sleep-time-milliseconds用於指定等待重試的間隔時間的最大值,單位是毫秒。
  • max-retries用於指定最大的重試次數。

配置作業

作業通過job名稱空間配置,簡單任務通過<job:simple/>指定。

<job:simple id="myElasticJob" class="com.elim.learn.elastic.job.MyElasticJob"
    registry-center-ref="regCenter" cron="0/30 * * * * ?"
    sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"
    failover="true" overwrite="true" />
  • id用於給該任務命名。
  • class用於指定需要應用的SimpleJob實現類。
  • registry-center-ref用於指定需要使用的註冊中心。
  • cron用於指定定時排程的規則,應用cron表示式的語法。
  • sharding-total-count用於指定總的分片數。
  • sharding-item-parameters用於指定每片對應的引數,該引數可以通過ShardingContext的getShardingParameter()獲取。
  • failover用於指定是否需要開啟失效轉移。只有在monitorExecution為true的情況下才有效,可以通過<job:simple monitor-execution="true"/>來指定,不過該屬性值預設也是true。
  • overwrite用於指定該配置是否需要用來覆蓋註冊中心的配置。修改了配置後一定要記得指定該屬性值為true,否則還是使用的註冊中心的舊的配置

使用上面的配置後,在Spring容器啟動後,我們的作業就會每30秒排程一次了。如果只有一臺機器,那麼上面的6片都會落到同一臺機器上,一共會發起6次排程。如果有兩臺機器就是每臺會得到三個分片,以此類推。當機器數量超出了分片數後,有的機器就會得不到分片,就沒有排程的機會,除非有機器宕機了,觸發了重新分片。

需要注意的是節點必須是在不同的機器上執行才行,一臺機器上啟動多個JVM是不會被認為是多個節點的,因為elastic-job是以客戶端的IP地址來識別一個節點的。

完整配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
    xmlns:job="http://www.dangdang.com/schema/ddframe/job"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">

    <description>
        官方文件:http://elasticjob.io/index.html
    </description>

    <!-- 如果需要做分散式作業排程,則對應的例項必須是在多臺機器上跑的,因為elastic-job是以IP來區分一個節點的;另外namespace和使用的 
        zookeeper也必須是一樣的 -->
    <!--配置作業註冊中心,server-lists用於指定zookeeper的地址,多個zookeeper之間用逗號分隔;
        namespace用於指定zookeeper名稱空間;
        max-retries用於指定最大重試次數 -->
    <reg:zookeeper id="regCenter" server-lists="localhost:2181"
        namespace="dd-job" base-sleep-time-milliseconds="1000"
        max-sleep-time-milliseconds="3000" max-retries="3" />

    <!-- 可通過在http://repo1.maven.org/maven2/com/dangdang/elastic-job-console/1.1.1/下載對應的war包監控elastic-job的執行狀態 -->

    <!-- 配置作業 -->
    <!-- 引數overwrite為true即允許客戶端的作業配置覆蓋註冊中心的配置,每次啟動服務都會將客戶端的覆蓋註冊中心的, 預設為false。引數failover表示是否開啟失效轉移,預設為false,其它引數配置請參考官方文件 -->

    <!-- sharding-total-count引數用於指定分片數,當分片數大於機器數量的時候,每臺機器分配到的片數會是平均的, 第一片是從0開始的,比如總共分6片,有兩臺機器,則第一臺機器會分得0,1,2三片,而第二臺機器會分得3,4,5三片;當有 
        機器宕機了或者有新機器加入的時候都會觸發重新分片。如果有多臺機器,而分片總數是1的時候即相當於1主多從的配置。 sharding-item-parameters用於指定與分片對應的別名。 
        job-sharding-strategy-class:可以通過它來指定作業分片策略,可選策略可參考官方文件https://github.com/dangdangdotcom/elastic-job/blob/master/elastic-job-doc/content/post/user_guide/lite/other/lite_job_strategy.md。 -->
    <!-- SimpleJob的執行可以參考原始碼com.dangdang.ddframe.job.executor.type.SimpleJobExecutor的處理邏輯 -->
    <job:simple id="myElasticJob" class="com.elim.learn.elastic.job.MyElasticJob"
        registry-center-ref="regCenter" cron="0/30 * * * * ?"
        sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"
        failover="true" overwrite="true" />

</beans>

如果你的job已經定義為了Spring的一個bean,那麼在定義<job:simple/>時也可以不指定class,而是指定job-ref屬性關聯job對應的bean,如:

<bean id="simpleJob" class="com.elim.learn.elastic.job.MyElasticJob"/>
<job:simple id="myElasticJob" job-ref="simpleJob"
    registry-center-ref="regCenter" cron="0/30 * * * * ?"
    sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"
    failover="true" overwrite="true" />

這裡的bean需要定義在<job:simple/>的前面,否則會提示找不到對應的bean定義。

(本文由Elim寫於2017年10月1日)