1. 程式人生 > >分散式開源排程框架TBSchedule原理與應用

分散式開源排程框架TBSchedule原理與應用

主要內容:

第一部分 TBSchedule基本概念及原理
1. 概念介紹
2. 工作原理
3. 原始碼分析
4. 與其他開源排程框架對比
第二部分 TBSchedule分散式排程示例
1. TBSchedule原始碼下載
2. 引入原始碼Demo開發示例
3. 控制檯配置任務排程
4. selectTasks方法引數說明
5. 建立排程策略引數說明
6. 建立任務引數說明

第一部分 TBSchedule基本概念及原理

1. 概念介紹

TBSchedule是一個支援分散式的排程框架,能讓一種批量任務或者不斷變化的任務,被動態的分配到多個主機的JVM中,不同的執行緒組中並行執行。基於ZooKeeper的純Java實現,由Alibaba開源。

2. 工作原理

TBSchesule對分散式的支援包括排程機的分散式和執行機的分散式,其網路部署架構圖如下:

這裡寫圖片描述

2.1 資料儲存

執行機和排程機均以ZooKeeper為註冊中心,所有資料以節點及節點內容的形式註冊,通過定時彙報主機狀態保持存活在ZooKeeper上。

首先看下執行機對ZooKeeper的使用配置檔案:

<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans default-autowire="byName"> <bean id="scheduleManagerFactory" class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory" init-method="init"> <property name="zkConfig"> <map> <entry key="zkConnectString"
value="${schedule.zookeeper.address}" />
<entry key="rootPath" value="${schedule.root.catalog}" /> <entry key="zkSessionTimeout" value="${schedule.timeout}" /> <entry key="userName" value="${schedule.username}" /> <entry key="password" value="${schedule.password}" /> <entry key="isCheckParentPath" value="true" /> </map> </property> </bean> </beans>

1)執行機部署啟動,會在ZooKeeper上建立永久根節點schedule.zookeeper.address,其後所有的操作均在該根節點下進行。

這裡以/ttest/creditjob為根節點,檢視執行機註冊後情況:

[zk: 172.26.50.86:2181(CONNECTED) 28] ls /ttest/creditjob
[strategy, baseTaskType, factory]
[zk: 172.26.50.86:2181(CONNECTED) 29] ls /ttest/creditjob/factory
[127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000, 127.0.0.1$MIE-ZHANGTAO-D1$D826BC6565DC4D6CB85F7AE321EE51AE$0000000001]

可以看到根節點下面有3個永久子節點,strategy儲存排程機建立的策略資訊,baseTaskType儲存排程機建立的任務資訊,factory儲存執行機註冊的主機資訊。每臺執行機啟動後,都會在factory下建立一個臨時順序子節點,該節點名是由TBSchedule原始碼生成的主機唯一表示。

根節點內容為當前TBSchedule內建版本號,可在程式修改,實際沒什麼意義。

[zk: 172.26.50.86:2181(CONNECTED) 17] get /ttest/creditjob
tbschedule-3.2.12

2)排程機部署啟動,這時不會對ZooKeeper節點做任何操作。開啟排程機配置面板:

這裡寫圖片描述

配置好ZooKeeper接入點,點選管理主頁,進入排程任務管理面板:

這裡寫圖片描述

輸入各項引數建立新任務後,此時會在baseTaskType下面建立任務名稱永久子節點(排程機所有都宕機重啟後,仍能保持資料的完整性),而當前節點的內容就是配置的各項引數。

[zk: 172.26.50.86:2181(CONNECTED) 37] ls /ttest/creditjob/baseTaskType
[IScheduleTaskDealSingleTest]

[zk: 172.26.50.86:2181(CONNECTED) 39] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest
{"baseTaskType":"IScheduleTaskDealSingleTest","heartBeatRate":5000,"judgeDeadInterval":60000,"sleepTimeNoData":500,"sleepTimeInterval":0,"fetchDataNumber":500,"executeNumber":1,"threadNumber":1,"processorType":"SLEEP","permitRunStartTime":"0 * * * * ?","expireOwnSignInterval":1.0,"dealBeanName":"iScheduleTaskDealSingleTest","taskParameter":"0","taskKind":"static","taskItems":["0"],"maxTaskItemsOfOneThreadGroup":0,"version":0,"sts":"resume"}

3)建立排程策略,控制排程機排程狀態。

這裡寫圖片描述

建立完成排程策略後開啟排程,此過程會在對應的任務節點strategy下建立永久子節點並寫入策略資料,在該子節點下建立表示排程機的臨時順序子節點並寫入排程策略資料。

[zk: 172.26.50.86:2181(CONNECTED) 56] get /ttest/creditjob/strategy/IScheduleTaskDealSingleTest
{"strategyName":"IScheduleTaskDealSingleTest","IPList":["127.0.0.1"],"numOfSingleServer":0,"assignNum":1,"kind":"Schedule","taskName":"IScheduleTaskDealSingleTest","taskParameter":"0","sts":"resume"}

[zk: 172.26.50.86:2181(CONNECTED) 57] get /ttest/creditjob/strategy/IScheduleTaskDealSingleTest/127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000
{"strategyName":"IScheduleTaskDealSingleTest","uuid":"127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000","requestNum":1,"currentNum":0,"message":""}

同時會在baseTaskType/IScheduleTaskDealSingleTest下建立下建立兩層永久子節點並註冊排程主機資料。

[zk: 172.26.50.86:2181(CONNECTED) 45] ls /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest
[taskItem, server]

[zk: 172.26.50.86:2181(CONNECTED) 50] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest/taskItem
IScheduleTaskDealSingleTest$127.0.0.1$4E8008EE18334564BE1E31C7C0D55296$0000000000,IScheduleTaskDealSingleTest$127.0.0.1$4E8008EE18334564BE1E31C7C0D55296$0000000001

[zk: 172.26.50.86:2181(CONNECTED) 51] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest/server  
reload=true
2.2 分散式高可用高效率保障

1)排程機的高可用有保障,多排程機向註冊中心註冊後,共享排程任務,且同一排程任務僅由一臺排程機執行排程,當前排程機異常宕機後,其餘的排程機會接上。

2)執行機的高可用有保障,多執行機向註冊中心註冊後,配置執行機單執行緒(多機匯流排程為1)執行任務,排程機會隨機啟動一臺執行機執行,當前執行異常機宕機後,排程機會會新排程一臺執行機。

3)執行機的並行高效保障,配置執行機多執行緒且劃分多工子項後,各任務子項均衡分配到所有執行機,各執行機均執行,多執行緒資料一致性協調由任務項引數區分。

4)彈性擴充套件失效轉移保障,執行中的執行機宕機,或新增執行機,排程機將在下次任務執行前重新分配任務項,不影響正常執行機任務(崩潰的執行機當前任務處理失效);執行中的排程機宕機或動態新增排程機,不影響執行機當前任務,排程機宕機後動態切換。

3. 原始碼分析
3.1 執行機註冊節選

從Spring配置檔案可以看到,執行機註冊的入口在TBScheduleManagerFactoryinit方法,程式碼片段:

public class TBScheduleManagerFactory implements ApplicationContextAware {
    public void init() throws Exception {
        Properties properties = new Properties();
        for(Map.Entry<String,String> e: this.zkConfig.entrySet()){
            properties.put(e.getKey(),e.getValue());
        }
        this.init(properties);
    }

    public void init(Properties p) throws Exception {
        if(this.initialThread != null){
            this.initialThread.stopThread();
        }
        this.lock.lock();
        try{
            this.scheduleDataManager = null;
            this.scheduleStrategyManager = null;
            ConsoleManager.setScheduleManagerFactory(this);
            if(this.zkManager != null){
                this.zkManager.close();
            }
            this.zkManager = new ZKManager(p);
            this.errorMessage = "Zookeeper connecting ......" + this.zkManager.getConnectStr();
            initialThread = new InitialThread(this);
            initialThread.setName("TBScheduleManagerFactory-initialThread");
            initialThread.start();
        }finally{
            this.lock.unlock();
        }
    }
}

init方法將配置引數封裝到Properties物件後開始初始化,連線上ZooKeeper並啟動一個新的執行緒進行節點資料處理。

this.zkManager = new ZKManager(p);
...
initialThread = new InitialThread(this);
initialThread.start();

跟蹤程式碼可以看到新執行緒呼叫的實際處理方法是:

public void initialData() throws Exception{

    /** 遞迴建立永久根節點(/ttest/creditjob)並寫入版本資訊 */
    this.zkManager.initial();

    /** 建立永久子節點 baseTaskType */
    this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);

    /** 建立永久子節點 strategy 和 factory */
    this.scheduleStrategyManager  = new ScheduleStrategyDataManager4ZK(this.zkManager);

    if (this.start == true) {

        /** 註冊排程管理器,建立臨時順序子節點,節點表示主機的註冊資訊 */
        this.scheduleStrategyManager.registerManagerFactory(this);
        if(timer == null){
            timer = new Timer("TBScheduleManagerFactory-Timer");
        }

        if(timerTask == null){
            /** 啟動一個定時器檢測ZooKeeper狀態,如果連線失敗,關閉所有的任務後,重新連線Zookeeper伺服器 */
            timerTask = new ManagerFactoryTimerTask(this);
            timer.schedule(timerTask, 2000,this.timerInterval);
        }
    }
}

上述幾個節點建立完成,並向ZooKeeper註冊監聽,當有資料變化時獲得通知(任務執行/暫停)。到這裡,就完成了執行機到ZooKeeper的註冊監聽過程。

3.2 排程任務建立節選

任務建立提交儲存為入口,將引數封裝到ScheduleTaskType物件中,呼叫節點建立和更新方法:

//taskTypeEdit.jsp->taskTypeDeal.jsp

if(action.equalsIgnoreCase("createTaskType")){
    ConsoleManager.getScheduleDataManager().createBaseTaskType(taskType);
    result = "任務" + baseTaskType + "建立成功!!!!";
}else{
    ConsoleManager.getScheduleDataManager().updateBaseTaskType(taskType);
    result = "任務" + baseTaskType + "修改成功!!!!";          
}

真正執行任務節點及資料處理的程式碼段:

//ScheduleDataManager4ZK.java

public void createBaseTaskType(ScheduleTaskType baseTaskType) throws Exception {
    if(baseTaskType.getBaseTaskType().indexOf("$") > 0){
        throw new Exception("排程任務" + baseTaskType.getBaseTaskType() +"名稱不能包括特殊字元 $");
    }

    /** 在 baseTaskType 節點下建立任務永久節點並寫入節點內容為任務配置引數 */
    String zkPath = this.PATH_BaseTaskType + "/"+ baseTaskType.getBaseTaskType();
    String valueString = this.gson.toJson(baseTaskType);
    if ( this.getZooKeeper().exists(zkPath, false) == null) {
        this.getZooKeeper().create(zkPath, valueString.getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
    } else {
    throw new Exception("排程任務" + baseTaskType.getBaseTaskType() + "已經存在,如果確認需要重建,請先呼叫deleteTaskType(String baseTaskType)刪除");
    }
}

如果是更新的話,就不會再建立任務永久節點了,直接修改任務節點內容即可。

3.3 策略建立節選

策略建立提交儲存為入口,將引數封裝到ScheduleStrategy物件中,呼叫節點建立和更新方法:

//scheduleStrategyEdit.jsp->scheduleStrategyDeal.jsp

if (action.equalsIgnoreCase("createScheduleStrategy")) {
    ConsoleManager.getScheduleStrategyManager().createScheduleStrategy(scheduleStrategy);
    isRefreshParent = true;
} else if (action.equalsIgnoreCase("editScheduleStrategy")) {
    ConsoleManager.getScheduleStrategyManager().updateScheduleStrategy(scheduleStrategy);
    isRefreshParent = true;
}

真正執行任務節點及資料處理的程式碼段:

//ScheduleStrategyDataManager4ZK.java

public void createScheduleStrategy(ScheduleStrategy scheduleStrategy) throws Exception {
    String zkPath = this.PATH_Strategy + "/"+ scheduleStrategy.getStrategyName();

    /** 在 strategy 節點下建立任務永久節點並寫入節點內容為任務配置引數 */
    String valueString = this.gson.toJson(scheduleStrategy);
    if ( this.getZooKeeper().exists(zkPath, false) == null) {
        this.getZooKeeper().create(zkPath, valueString.getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
    } else {
        throw new Exception("排程策略" + scheduleStrategy.getStrategyName() + "已經存在,如果確認需要重建,請先呼叫deleteMachineStrategy(String taskType)刪除");
    }
}

如果是更新的話,就不會再建立任務永久節點了,直接修改任務節點內容即可。

3.4 排程控制節選

策略排程控制程式碼段:

//scheduleStrategyList.jsp->scheduleStrategyDeal.jsp

else if (action.equalsIgnoreCase("deleteScheduleStrategy")) {
    ConsoleManager.getScheduleStrategyManager()
                    .deleteMachineStrategy(
    scheduleStrategy.getStrategyName());
    isRefreshParent = true;
} else if (action.equalsIgnoreCase("pauseTaskType")) {
    ConsoleManager.getScheduleStrategyManager().pause(
                    scheduleStrategy.getStrategyName());
    isRefreshParent = true;
} else if (action.equalsIgnoreCase("resumeTaskType")) {
    ConsoleManager.getScheduleStrategyManager().resume(
                    scheduleStrategy.getStrategyName());
    isRefreshParent = true;
} 

真正執行任務節點及資料處理的程式碼段:

//ScheduleStrategyDataManager4ZK.java

/** 策略刪除,即刪除strategy下對應的策略節點及資料 */
public void deleteMachineStrategy(String taskType) throws Exception {
    deleteMachineStrategy(taskType,false);
}

/** 排程暫停,即修改strategy下對應的策略節點的狀態標示資料 */
public void pause(String strategyName) throws Exception{
    ScheduleStrategy strategy = this.loadStrategy(strategyName);
    strategy.setSts(ScheduleStrategy.STS_PAUSE);
    this.updateScheduleStrategy(strategy);
}

/** 排程啟動,即修改strategy下對應的策略節點的狀態標示資料 */
public void resume(String strategyName) throws Exception{
    ScheduleStrategy strategy = this.loadStrategy(strategyName);
    strategy.setSts(ScheduleStrategy.STS_RESUME);
    this.updateScheduleStrategy(strategy);      
}

修改節點資料,通過ZooKeeper的事件通知機制,讓執行機獲得變更通知。

4. 與其他開源排程框架對比

1)Quartz:Java事實上的定時任務標準。但Quartz關注點在於定時任務而非資料,並無一套根據資料處理而定製化的流程。雖然Quartz可以基於資料庫實現作業的高可用,缺少分散式並行執行作業的功能。

2)Crontab:Linux系統級的定時任務執行器,缺乏分散式和集中管理功能。

3)elastic-job:噹噹網最近開源專案,功能跟TBSchedule幾乎一樣(批鬥TBSchedule文件缺失嚴重),一臺伺服器只能開啟一個任務例項,基於Ip不基於IpPort,單機難除錯叢集功能。

4)TBSchedule:淘寶早期開源,穩定性可以保證。

第二部分 TBSchedule分散式排程示例

1. TBSchedule原始碼下載

下載TBSchedule原始碼工程,內容包括兩部分:工程編譯成jar的任務開發依賴包和工程編譯成war的排程控制檯。

2. 引入原始碼Demo開發示例

當前示例與Spring整合,原始碼可作為普通工程依賴入任務工程,也可將其打包成jar並引入依賴,此處版本為3.2.2.2

補充:若打包失敗,請檢查編譯外掛版本及jdk編譯版本。

任務工程依賴

<dependency>
    <groupId>com.taobao.pamirs.schedule</groupId>
    <artifactId>tbschedule</artifactId>
    <version>3.3.3.2</version>
</dependency>

排程任務實現IScheduleTaskDealSingle,並實現selectTasksexecute方法,詳細示例:

Component("iScheduleTaskDealSingleTest")
public class IScheduleTaskDealSingleTest implements IScheduleTaskDealSingle<TaskModel> {

    private static final Logger LOG = LoggerFactory.getLogger(IScheduleTaskDealSingleTest.class);

    @Override
    public Comparator<TaskModel> getComparator() {
        return null;
    }

    @Override
    public List<TaskModel> selectTasks(String taskParameter, String ownSign, int taskQueueNum,
            List<TaskItemDefine> taskItemList, int eachFetchDataNum) throws Exception {

        LOG.info("IScheduleTaskDealSingleTest配置的引數,taskParameter:{},ownSina:{},taskQueueNum:{},taskItemList:{}, eachFetchDataNum:{}", taskParameter, ownSign, taskQueueNum, taskItemList, eachFetchDataNum);

        LOG.info("IScheduleTaskDealSingleTest選擇任務列表開始啦..........");
        List<TaskModel> models = new ArrayList<TaskModel>();
        models.add(new TaskModel(String.valueOf(System.currentTimeMillis()), "taosirTest1"));
        models.add(new TaskModel(String.valueOf(System.currentTimeMillis()), "taosirTest2"));

        return models;

    }

    @Override
    public boolean execute(TaskModel model, String ownSign) throws Exception {

        LOG.info("IScheduleTaskDealSingleTest執行開始啦.........." + new Date());
        System.out.println(model);
        return true;

    }

}

其中selectTasks方法獲取需要處理的列表(用集合裝著),迴圈集合中的元素並呼叫execute方法執行。子計時任務啟動,會直到獲取不到資料後才停止等待下一個子計時開始,引數後面詳細介紹。

將排程任務註冊到zookeeper中心,spring中引入如下配置:

<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans default-autowire="byName">
    <bean id="scheduleManagerFactory"
        class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
        init-method="init">
        <property name="zkConfig">
            <map>
                <entry key="zkConnectString" value="${schedule.zookeeper.address}" />
                <entry key="rootPath" value="${schedule.root.catalog}" />
                <entry key="zkSessionTimeout" value="${schedule.timeout}" />
                <entry key="userName" value="${schedule.username}" />
                <entry key="password" value="${schedule.password}" />
                <entry key="isCheckParentPath" value="true" />
            </map>
        </property>
    </bean>
</beans>

環境屬性配置檔案增加如下配置:

#註冊中心地址
schedule.zookeeper.address=172.26.50.86:2181
#定時任務根目錄,任意指定,排程控制檯配置時對應
schedule.root.catalog=/tbschedule/example
#賬戶,任意指定,排程控制檯配置時對應
schedule.username=username
#密碼,任意指定,排程控制檯配置時對應
schedule.password=password
#超時配置
schedule.timeout=60000

啟動容器,iScheduleTaskDealSingleTest就完成了到zookeeper中心的註冊。

補充:TBSchedule提供了IScheduleTaskDealSingle和IScheduleTaskDealMulti兩個介面,個人在測試中發現兩者除了execute方法上引數不同外,功能上並沒有別的不同,只是語義上的區分,在處理模式為SLEEP下getComparator()沒有用,一般情況下,都是SLEEP模式。

3. 控制檯配置任務排程

將控制檯ScheduleConsole.war部署到tomcat容器。

補充:我通過ant執行原始碼中的build.xml構建控制檯,部署執行失敗(沒用過ant,目前不知道原因),這種情況下:

使用方式一,直接用下載包中的控制檯部署即可。
使用方式二,修改工程配置打成war包,這靈活,還可以自定義修改,原始檔不支援中文,可將編碼改成utf-8支援。

向註冊中心註冊配置(跟任務註冊用同一根目錄,官方wiki圖示)
http://{server}:{port}/ScheduleConsole/schedule/config.jsp

配置排程任務(官方wiki圖示)
http://{server}:{port}/ScheduleConsole/schedule/index.jsp

4. selectTasks方法引數說明

taskParameter:對應控制檯自定義引數,可自定義傳入做邏輯上的操作

taskQueueNum:對應控制檯任務項數量

taskItemList:集合中TaskItemDefine的id值對應任務項值,多執行緒處理時,根據任務項協調資料一致性和完整性

eachFetchDataNum:對應控制檯每次獲取數量,由於子計時單元開始後,會不斷的去取資料進行處理,直到取不到資料子計時才停止,等待下一個子計時開始。可以限制每次取數,防止一次性資料記錄過大,記憶體不足。

ownSign:環境引數,一般沒什麼用

5. 建立排程策略引數說明

策略名稱:策略標示,可任意填寫

任務型別:一般保持預設Schedule

任務名稱:對應工作列被排程任務名稱

任務引數:一般不用,保持預設

單JVM最大執行緒組數量:單個JVM允許開啟的執行緒數

最大執行緒組數量:多處理機情況下的執行緒總數限制(匯流排程為2,任務項執行緒為4是沒有意義的)

IP地址127.0.0.1或者localhost會在所有機器上執行,注意多處理機若沒有根據任務子項劃分資料處理,會導致多處理機重複處理資料,謹慎配置

建立示例,官方wiki上有圖示,上面主要是各引數的具體含義。

6. 建立任務引數說明

任務名稱:策略排程的標示,一旦建立儲存,不可更改

任務處理的SpringBean:註冊到spring的任務bean,如iScheduleTaskDealSingleTest

心跳頻率/假定服務死亡時間/處理模式/沒有資料時休眠時長/執行結束時間:一般保持預設即可

執行緒數:處理該任務的執行緒數,在沒有劃分多任務項的情況下,多執行緒是沒有意義的,且執行緒數量大於任務項也是沒有意義的(執行緒數小於等於任務項),注意如果開啟多執行緒,必須對資料做任務項過濾

單執行緒組最大任務項:配置單JVM處理的最大任務項數量,多任務項情況下,可按需限制,一般預設,多執行機會均衡分配

每次獲取數量:子計時單元開始,執行緒會不斷的去獲取資料(每次獲取的限制)並處理資料,直到獲取不到資料子計時才結束(方法內不用就可以隨意配置)

每次執行數量://還沒測試過(可能是將獲取的數量拆分多次執行)

每次處理完休眠時間:子計時單元開始,只要有資料,就會不停的獲取不停的處理,這個時間設定後,子計時單元開始每次獲取執行後,不管還有沒有待資料,都先歇會兒再獲取處理

自定義引數:可自定義控制任務邏輯操作

任務項:這項很重要,在多執行緒情況下,劃分任務項是有意義的,但是要注意必須通過任務項引數,協調待處理資料,否則多執行緒會重複處理

建立示例,官方wiki上有圖示,上面主要是各引數的具體含義。