1. 程式人生 > >分散式任務框架--Elastic-job

分散式任務框架--Elastic-job

一、簡介 Elastic-job是一個分散式排程解決方案,由2個相互獨立的子專案Elastic-job-Lite和Elastic-Job-cloud組成; Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務;Elastic-Job-Cloud採用噹噹網自研Mesos Framework的解決方案,額外提供資源治理,應用分發以及程序隔離等功能; 官方開源資料:https://github.com/dangdangdotcom/elastic-job 二、基本概念 我們開發定時任務一般都是使用quartz或者spring-task(ScheduledExecutorService),無論是使用quartz還是spring-task,我們都會至少遇到兩個痛點: 1.不敢輕易跟著應用服務多節點部署,可能會重複多次執行而引發系統邏輯的錯誤。 2.quartz的叢集僅僅只是用來單機鎖表防止重複多次執行,節點數量的增加並不能給我們的每次執行效率帶來提升,即不能實現水平擴充套件。 Elastic-Job主要的設計理念是無中心化的分散式定時排程框架,思路來源於Quartz的基於資料庫的高可用方案。但資料庫沒有分散式協調功能,所以在高可用方案的基礎上增加了彈性擴容和資料分片的思路,以便於更大限度的利用分散式伺服器的資源。 三、Elastic-Job-Lite原理 Elastic-Job在2.x之後,出了兩個產品線:Elastic-Job-Lite和Elastic-Job-Cloud。我們一般使用Elastic-Job-Lite就能夠滿足需求,本文也是以Elastic-Job-Lite為主。1.x系列對應的就只有Elastic-Job-Lite,並且在2.x裡修改了一些核心類名,差別雖大,原理類似,建議使用2.x系列 在這裡插入圖片描述

舉個典型的job場景,比如餘額寶裡的昨日收益,系統需要job在每天某個時間點開始,給所有餘額寶使用者計算收益。如果使用者數量不多,我們可以輕易使用quartz來完成,我們讓計息job在某個時間點開始執行,迴圈遍歷所有使用者計算利息,這沒問題。可是,如果使用者體量特別大,我們可能會面臨著在第二天之前處理不完這麼多使用者。另外,我們部署job的時候也得注意,我們可能會把job直接放在我們的webapp裡,webapp通常是多節點部署的,這樣,我們的job也就是多節點,多個job同時執行,很容易造成重複執行,比如使用者重複計息,為了避免這種情況,我們可能會對job的執行加鎖,保證始終只有一個節點能執行,或者乾脆讓job從webapp裡剝離出來,獨自部署一個節點。 Elastic-job就可以幫著我們解決上面的問題,Elastic底層的任務排程還是使用了quartz,通過zookkeeper來動態給job節點分片; 我們來看問題點: 1、存在大資料量的使用者需要再特定的時間段內完成計息 我們肯定希望我們的任務可以通過叢集達到水平擴充套件,叢集裡面的每一個節點都處理部分使用者,不管使用者數量有多大,我們只要增加機器就可以了,比如說單個機器在特定的時間內能處理N個使用者的計息,2臺機器處理2N個使用者,3臺3N….,再多的使用者也不怕了。 使用elastic-job開發的作業都是zookeeper的客戶端,比如我希望3臺機器跑job,我們將任務分成3片,框架通過zk的協調,最終會讓3臺機器分別分配到0,1,2的任務片,比如: server0–>0,server1–>1,server2–>2, 1、當server0執行時,可以只查詢id%30的使用者; 2、server1執行時,只查詢id%3
1的使用者; 3、server2執行時,只查詢id%3==2的使用者。 在上面的基礎上,我們再增加server3,此時,server3分不到任務分片,因為只有3片,已經分完了。沒有分到任務分片的作業程式將不執行。如果此時server2掛了,那麼server2的分片項會分配給server3,server3有了分片,就會替代server2執行。如果此時server3也掛了,只剩下server0和server1了,框架也會自動把server3的分片隨機分配給server0或者server1,可能會這樣,server0–>0,server1–>1,2。這種特性稱之為彈性擴容,即elastic-job名稱的由來。 四、作業型別 elastic-job提供了三種類型的作業:Simple型別作業、Dataflow型別作業、Script型別作業。這裡主要講解前兩者。Script型別作業意為指令碼型別作業,支援shell,python,perl等所有型別指令碼,使用不多,可以參見github文件。 SimpleJob需要實現SimpleJob介面,意為簡單實現,未經過任何封裝,與quartz原生介面相似,比如示例程式碼中所使用的job。 Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。 可通過DataflowJobConfiguration配置是否流式處理。 流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; 非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。 實際開發中,Dataflow型別的job還是很有好用的。 五、分片策略 AverageAllocationJobShardingStrategy 全路徑: io.elasticjob.lite.api.strategy.impl.AverageAllocationJobShardingStrategy 策略說明: 基於平均分配演算法的分片策略,也是預設的分片策略。 如果分片不能整除,則不能整除的多餘分片將依次追加到序號小的伺服器。如: 如果有3臺伺服器,分成9片,則每臺伺服器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8] 如果有3臺伺服器,分成8片,則每臺伺服器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5] 如果有3臺伺服器,分成10片,則每臺伺服器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8] OdevitySortByNameJobShardingStrategy 全路徑: io.elasticjob.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy 策略說明: 根據作業名的雜湊值奇偶數決定IP升降序演算法的分片策略。 作業名的雜湊值為奇數則IP升序。 作業名的雜湊值為偶數則IP降序。 用於不同的作業平均分配負載至不同的伺服器。 RotateServerByNameJobShardingStrategy 全路徑: io.elasticjob.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy 策略說明: 根據作業名的雜湊值對伺服器列表進行輪轉的分片策略。 六、程式碼示例 1、引入maven依賴

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.0.5</version>
</dependency>
<!-- 使用springframework自定義名稱空間時引入 -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.0.5</version>
</dependency>

2、Simple作業開發 意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。

public class MySimpleJob implements SimpleJob {
 @Override
 public void execute(ShardingContext shardingContext) {
     int shardIndx = shardingContext.getShardingItem();
     if (shardIndx == 0) {
         //處理id為奇數的商家
         System.out.println(String.format("------Thread ID: %s, Total number of task slices: %s, current fragmentation items: %s",
                 Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
     } else {
         //處理id為偶數的商家
         System.out.println(String.format("------Thread ID: %s, Total number of task slices: %s, current fragmentation items: %s",
                 Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));

     }
     /**
      * 實際開發中,有了任務總片數和當前分片項,就可以對任務進行分片執行了
      * 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem
      */
 }
}

3、作業配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd">
    <!--配置作業註冊中心 -->
    <reg:zookeeper id="regCenter" server-lists="172.16.150.247:2181" namespace="dd-job"
                   base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3"/>

    <!-- 配置作業-->
    <job:simple id="mySimpleJob" class="com.liuxl.elastic.MySimpleJob" registry-center-ref="regCenter"
                sharding-total-count="2" cron="0/2 * * * * ?" overwrite="true"/>
    <!--failover:是否開啟任務執行失效轉移,開啟表示如果作業在一次任務執行中途宕機,允許將該次未完成的任務在另一作業節點上補償執行
    description:作業描述
    overwrite:本地配置是否可覆蓋註冊中心配置,如果可覆蓋,每次啟動作業都以本地配置為準
    event-trace-rdb-data-source:作業事件追蹤的資料來源Bean引用
    —>
</beans>

4、單機執行 本機執行 在這裡插入圖片描述 5、叢集執行 本機執行、132機器 在這裡插入圖片描述 在這裡插入圖片描述 6、Dataflow作業開發 public class MyElasticJob implements DataflowJob<List> {

    @Override
    public List<List<Object>> fetchData(ShardingContext shardingContext) {
        int shardIndx = shardingContext.getShardingItem();
        File file = new File("/Users/liuxl/Desktop/worker/智慧宣教引擎/20181012.xlsx");
        List<List<Object>> reslut = new ArrayList<>();
        try {
            List<ExcelSheetPO> pos = ExcelUtil.readExcel(file, 400, 3);
            if (shardIndx == 0) {
                List<List<Object>> list = pos.get(0).getDataList();
                //不要病區id
                for (List<Object> objects : list) {
                    objects.remove(1);
                }
                reslut = list;
            } else {
                List<List<Object>> list = pos.get(0).getDataList();
                // 不要診斷名字
                for (List<Object> objects : list) {
                    objects.remove(0);
                }
                reslut = list;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return reslut;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<List<Object>> list) {
        List<Object> objects = list.get(0);
        System.out.println(
                String.format("------Thread ID: %s, Total number of task slices: %s, current fragmentation items: %s,content : %s", Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), JSONObject.toJSON(objects)
                ));
    }
}

7、作業配置

<job:dataflow id="myDataFlowJob" class="com.liuxl.elastic.MyElasticJob" registry-center-ref=“regCenter" sharding-total-count="2" cron="0/2 * * * * ?" streaming-process="true" overwrite=“true" job-exceptionhandler=“com.liuxl.elastic.exception.MyJobExceptionHandler"/>

七、任務日誌捕捉 elastic-job允許使用者在任務排程異常時指定處理異常的異常處理器,異常處理器由介面JobExceptionHandler定義,定義如下

public interface JobExceptionHandler {
    void handleException(String var1, Throwable var2);
}

如果沒有指定自己的異常處理器elastic-job預設將使用DefaultJobExceptionHandler處理異常,其定義如下:

public final class DefaultJobExceptionHandler implements JobExceptionHandler {
    private static final Logger log = LoggerFactory.getLogger(DefaultJobExceptionHandler.class);

    public DefaultJobExceptionHandler() {
    }

    public void handleException(String jobName, Throwable cause) {
        log.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
    }
}

以下是一個自定義的異常處理器的示例:

public class MyJobExceptionHandler implements JobExceptionHandler {

    private static final Logger logger = Logger.getLogger(MyJobExceptionHandler.class);

    @Override
    public void handleException(String jobName, Throwable cause) {
        logger.error(String.format("任務[%s]排程異常", jobName), cause);
    }
}

異常處理器的配置是通過job-exception-handler屬性指定的,所有作業型別的異常處理器的配置是通用的:

<job:dataflow id="myDataFlowJob" class="com.liuxl.elastic.MyElasticJob" registry-center-ref="regCenter"
              sharding-total-count="2" cron="0/2 * * * * ?" streaming-process="true" overwrite="true"
              job-exception-handler="com.liuxl.elastic.exception.MyJobExceptionHandler"/>