1. 程式人生 > >分布式開源調度框架TBSchedule原理與應用

分布式開源調度框架TBSchedule原理與應用

中文 就會 ans exc 中心 崩潰 任務創建 集中 通過

主要內容:

第一部分 TBSchedule基本概念及原理
1. 概念介紹
2. 工作原理
3. 源代碼分析
4. 與其它開源調度框架對照

第二部分 TBSchedule分布式調度演示樣例
1. TBSchedule源代碼下載
2. 引入源代碼Demo開發演示樣例
3. 控制臺配置任務調度
4. selectTasks方法參數說明
5. 創建調度策略參數說明
6. 創建任務參數說明


第一部分 TBSchedule基本概念及原理

1. 概念介紹

TBSchedule是一個支持分布式的調度框架。能讓一種批量任務或者不斷變化的任務,被動態的分配到多個主機的JVM中,不同的線程組中並行運行。基於ZooKeeper的純Java實現,由Alibaba開源。

2. 工作原理

TBSchesule對分布式的支持包含調度機的分布式和運行機的分布式,其網絡部署架構圖例如以下:

技術分享

2.1 數據存儲

運行機和調度機均以ZooKeeper為註冊中心,全部數據以節點及節點內容的形式註冊,通過定時匯報主機狀態保持存活在ZooKeeper上。

首先看下運行機對ZooKeeper的使用配置文件:

<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans default-autowire="byName"> <bean id="scheduleManagerFactory" class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory" init-method="init"> <property name="zkConfig"> <map> <entry key="zkConnectString"
value="${schedule.zookeeper.address}" />
<entry key="rootPath" value="${schedule.root.catalog}" /> <entry key="zkSessionTimeout" value="${schedule.timeout}" /> <entry key="userName" value="${schedule.username}" /> <entry key="password" value="${schedule.password}" /> <entry key="isCheckParentPath" value="true" /> </map> </property> </bean> </beans>

1)運行機部署啟動,會在ZooKeeper上創建永久根節點schedule.zookeeper.address,其後全部的操作均在該根節點下進行。

這裏以/ttest/creditjob為根節點。查看運行機註冊後情況:

[zk: 172.26.50.86:2181(CONNECTED) 28] ls /ttest/creditjob
[strategy, baseTaskType, factory]
[zk: 172.26.50.86:2181(CONNECTED) 29] ls /ttest/creditjob/factory
[127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000, 127.0.0.1$MIE-ZHANGTAO-D1$D826BC6565DC4D6CB85F7AE321EE51AE$0000000001]

能夠看到根節點以下有3個永久子節點,strategy存儲調度機創建的策略信息,baseTaskType存儲調度機創建的任務信息,factory存儲運行機註冊的主機信息。每臺運行機啟動後。都會在factory下創建一個暫時順序子節點,該節點名是由TBSchedule源代碼生成的主機唯一表示。

根節點內容為當前TBSchedule內置版本號號。可在程序改動,實際沒什麽意義。

[zk: 172.26.50.86:2181(CONNECTED) 17] get /ttest/creditjob
tbschedule-3.2.12

2)調度機部署啟動,這時不會對ZooKeeper節點做不論什麽操作。打開調度機配置面板:

技術分享

配置好ZooKeeper接入點,點擊管理主頁。進入調度任務管理面板:

技術分享

輸入各項參數創建新任務後,此時會在baseTaskType以下創建任務名稱永久子節點(調度機全部都宕機重新啟動後。仍能保持數據的完整性)。而當前節點的內容就是配置的各項參數。

[zk: 172.26.50.86:2181(CONNECTED) 37] ls /ttest/creditjob/baseTaskType
[IScheduleTaskDealSingleTest]

[zk: 172.26.50.86:2181(CONNECTED) 39] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest
{"baseTaskType":"IScheduleTaskDealSingleTest","heartBeatRate":5000,"judgeDeadInterval":60000,"sleepTimeNoData":500,"sleepTimeInterval":0,"fetchDataNumber":500,"executeNumber":1,"threadNumber":1,"processorType":"SLEEP","permitRunStartTime":"0 * * * * ?","expireOwnSignInterval":1.0,"dealBeanName":"iScheduleTaskDealSingleTest","taskParameter":"0","taskKind":"static","taskItems":["0"],"maxTaskItemsOfOneThreadGroup":0,"version":0,"sts":"resume"}

3)創建調度策略,控制調度機調度狀態。

技術分享

創建完畢調度策略後開啟調度,此過程會在相應的任務節點strategy下創建永久子節點並寫入策略數據。在該子節點下創建表示調度機的暫時順序子節點並寫入調度策略數據。

[zk: 172.26.50.86:2181(CONNECTED) 56] get /ttest/creditjob/strategy/IScheduleTaskDealSingleTest
{"strategyName":"IScheduleTaskDealSingleTest","IPList":["127.0.0.1"],"numOfSingleServer":0,"assignNum":1,"kind":"Schedule","taskName":"IScheduleTaskDealSingleTest","taskParameter":"0","sts":"resume"}

[zk: 172.26.50.86:2181(CONNECTED) 57] get /ttest/creditjob/strategy/IScheduleTaskDealSingleTest/127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000
{"strategyName":"IScheduleTaskDealSingleTest","uuid":"127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000","requestNum":1,"currentNum":0,"message":""}

同一時候會在baseTaskType/IScheduleTaskDealSingleTest下創建下創建兩層永久子節點並註冊調度主機數據。

[zk: 172.26.50.86:2181(CONNECTED) 45] ls /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest
[taskItem, server]

[zk: 172.26.50.86:2181(CONNECTED) 50] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest/taskItem
IScheduleTaskDealSingleTest$127.0.0.1$4E8008EE18334564BE1E31C7C0D55296$0000000000,IScheduleTaskDealSingleTest$127.0.0.1$4E8008EE18334564BE1E31C7C0D55296$0000000001

[zk: 172.26.50.86:2181(CONNECTED) 51] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest/server  
reload=true
2.2 分布式高可用高效率保障

1)調度機的高可用有保障,多調度機向註冊中心註冊後,共享調度任務。且同一調度任務僅由一臺調度機運行調度。當前調度機異常宕機後。其余的調度機會接上。

2)運行機的高可用有保障,多運行機向註冊中心註冊後,配置運行機單線程(多機總線程為1)運行任務。調度機會隨機啟動一臺運行機運行。當前運行異常機宕機後。調度機會會新調度一臺運行機。

3)運行機的並行高效保障,配置運行機多線程且劃分多任務子項後,各任務子項均衡分配到全部運行機,各運行機均運行,多線程數據一致性協調由任務項參數區分。

4)彈性擴展失效轉移保障,運行中的運行機宕機,或新增運行機。調度機將在下次任務運行前又一次分配任務項,不影響正常運行機任務(崩潰的運行機當前任務處理失效)。運行中的調度機宕機或動態新增調度機,不影響運行機當前任務,調度機宕機後動態切換。

3. 源代碼分析
3.1 運行機註冊節選

從Spring配置文件能夠看到,運行機註冊的入口在TBScheduleManagerFactoryinit方法,代碼片段:

public class TBScheduleManagerFactory implements ApplicationContextAware {
    public void init() throws Exception {
        Properties properties = new Properties();
        for(Map.Entry<String,String> e: this.zkConfig.entrySet()){
            properties.put(e.getKey(),e.getValue());
        }
        this.init(properties);
    }

    public void init(Properties p) throws Exception {
        if(this.initialThread != null){
            this.initialThread.stopThread();
        }
        this.lock.lock();
        try{
            this.scheduleDataManager = null;
            this.scheduleStrategyManager = null;
            ConsoleManager.setScheduleManagerFactory(this);
            if(this.zkManager != null){
                this.zkManager.close();
            }
            this.zkManager = new ZKManager(p);
            this.errorMessage = "Zookeeper connecting ......" + this.zkManager.getConnectStr();
            initialThread = new InitialThread(this);
            initialThread.setName("TBScheduleManagerFactory-initialThread");
            initialThread.start();
        }finally{
            this.lock.unlock();
        }
    }
}

init方法將配置參數封裝到Properties對象後開始初始化,連接上ZooKeeper並啟動一個新的線程進行節點數據處理。

this.zkManager = new ZKManager(p);
...
initialThread = new InitialThread(this);
initialThread.start();

跟蹤代碼能夠看到新線程調用的實際處理方法是:

public void initialData() throws Exception{

    /** 遞歸創建永久根節點(/ttest/creditjob)並寫入版本號信息 */
    this.zkManager.initial();

    /** 創建永久子節點 baseTaskType */
    this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);

    /** 創建永久子節點 strategy 和 factory */
    this.scheduleStrategyManager  = new ScheduleStrategyDataManager4ZK(this.zkManager);

    if (this.start == true) {

        /** 註冊調度管理器,創建暫時順序子節點。節點表示主機的註冊信息 */
        this.scheduleStrategyManager.registerManagerFactory(this);
        if(timer == null){
            timer = new Timer("TBScheduleManagerFactory-Timer");
        }

        if(timerTask == null){
            /** 啟動一個定時器檢測ZooKeeper狀態,假設連接失敗,關閉全部的任務後,又一次連接Zookeeper服務器 */
            timerTask = new ManagerFactoryTimerTask(this);
            timer.schedule(timerTask, 2000,this.timerInterval);
        }
    }
}

上述幾個節點創建完畢,並向ZooKeeper註冊監聽,當有數據變化時獲得通知(任務運行/暫停)。到這裏。就完畢了運行機到ZooKeeper的註冊監聽過程。

3.2 調度任務創建節選

任務創建提交保存為入口。將參數封裝到ScheduleTaskType對象中,調用節點創建和更新方法:

//taskTypeEdit.jsp->taskTypeDeal.jsp

if(action.equalsIgnoreCase("createTaskType")){
    ConsoleManager.getScheduleDataManager().createBaseTaskType(taskType);
    result = "任務" + baseTaskType + "創建成功!

!!

"; }else{ ConsoleManager.getScheduleDataManager().updateBaseTaskType(taskType); result = "任務" + baseTaskType + "改動成功!

!。"; }

真正運行任務節點及數據處理的代碼段:

//ScheduleDataManager4ZK.java

public void createBaseTaskType(ScheduleTaskType baseTaskType) throws Exception {
    if(baseTaskType.getBaseTaskType().indexOf("$") > 0){
        throw new Exception("調度任務" + baseTaskType.getBaseTaskType() +"名稱不能包含特殊字符 $");
    }

    /** 在 baseTaskType 節點下創建任務永久節點並寫入節點內容為任務配置參數 */
    String zkPath = this.PATH_BaseTaskType + "/"+ baseTaskType.getBaseTaskType();
    String valueString = this.gson.toJson(baseTaskType);
    if ( this.getZooKeeper().exists(zkPath, false) == null) {
        this.getZooKeeper().create(zkPath, valueString.getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
    } else {
    throw new Exception("調度任務" + baseTaskType.getBaseTaskType() + "已經存在,假設確認須要重建,請先調用deleteTaskType(String baseTaskType)刪除");
    }
}

假設是更新的話,就不會再創建任務永久節點了,直接改動任務節點內容就可以。

3.3 策略創建節選

策略創建提交保存為入口,將參數封裝到ScheduleStrategy對象中。調用節點創建和更新方法:

//scheduleStrategyEdit.jsp->scheduleStrategyDeal.jsp

if (action.equalsIgnoreCase("createScheduleStrategy")) {
    ConsoleManager.getScheduleStrategyManager().createScheduleStrategy(scheduleStrategy);
    isRefreshParent = true;
} else if (action.equalsIgnoreCase("editScheduleStrategy")) {
    ConsoleManager.getScheduleStrategyManager().updateScheduleStrategy(scheduleStrategy);
    isRefreshParent = true;
}

真正運行任務節點及數據處理的代碼段:

//ScheduleStrategyDataManager4ZK.java

public void createScheduleStrategy(ScheduleStrategy scheduleStrategy) throws Exception {
    String zkPath = this.PATH_Strategy + "/"+ scheduleStrategy.getStrategyName();

    /** 在 strategy 節點下創建任務永久節點並寫入節點內容為任務配置參數 */
    String valueString = this.gson.toJson(scheduleStrategy);
    if ( this.getZooKeeper().exists(zkPath, false) == null) {
        this.getZooKeeper().create(zkPath, valueString.getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
    } else {
        throw new Exception("調度策略" + scheduleStrategy.getStrategyName() + "已經存在,假設確認須要重建,請先調用deleteMachineStrategy(String taskType)刪除");
    }
}

假設是更新的話,就不會再創建任務永久節點了,直接改動任務節點內容就可以。

3.4 調度控制節選

策略調度控制代碼段:

//scheduleStrategyList.jsp->scheduleStrategyDeal.jsp

else if (action.equalsIgnoreCase("deleteScheduleStrategy")) {
    ConsoleManager.getScheduleStrategyManager()
                    .deleteMachineStrategy(
    scheduleStrategy.getStrategyName());
    isRefreshParent = true;
} else if (action.equalsIgnoreCase("pauseTaskType")) {
    ConsoleManager.getScheduleStrategyManager().pause(
                    scheduleStrategy.getStrategyName());
    isRefreshParent = true;
} else if (action.equalsIgnoreCase("resumeTaskType")) {
    ConsoleManager.getScheduleStrategyManager().resume(
                    scheduleStrategy.getStrategyName());
    isRefreshParent = true;
} 

真正運行任務節點及數據處理的代碼段:

//ScheduleStrategyDataManager4ZK.java

/** 策略刪除,即刪除strategy下相應的策略節點及數據 */
public void deleteMachineStrategy(String taskType) throws Exception {
    deleteMachineStrategy(taskType,false);
}

/** 調度暫停,即改動strategy下相應的策略節點的狀態標示數據 */
public void pause(String strategyName) throws Exception{
    ScheduleStrategy strategy = this.loadStrategy(strategyName);
    strategy.setSts(ScheduleStrategy.STS_PAUSE);
    this.updateScheduleStrategy(strategy);
}

/** 調度啟動,即改動strategy下相應的策略節點的狀態標示數據 */
public void resume(String strategyName) throws Exception{
    ScheduleStrategy strategy = this.loadStrategy(strategyName);
    strategy.setSts(ScheduleStrategy.STS_RESUME);
    this.updateScheduleStrategy(strategy);      
}

改動節點數據,通過ZooKeeper的事件通知機制,讓運行機獲得變更通知。

4. 與其它開源調度框架對照

1)Quartz:Java其實的定時任務標準。

但Quartz關註點在於定時任務而非數據。並無一套依據數據處理而定制化的流程。

盡管Quartz能夠基於數據庫實現作業的高可用。缺少分布式並行運行作業的功能。

2)Crontab:Linux系統級的定時任務運行器。缺乏分布式和集中管理功能。

3)elastic-job:當當網近期開源項目,功能跟TBSchedule差點兒一樣(批鬥TBSchedule文檔缺失嚴重),一臺服務器僅僅能開啟一個任務實例,基於Ip不基於IpPort,單機難調試集群功能。

4)TBSchedule:淘寶早期開源。穩定性能夠保證。


第二部分 TBSchedule分布式調度演示樣例

1. TBSchedule源代碼下載

下載TBSchedule源代碼project,內容包含兩部分:project編譯成jar的任務開發依賴包和project編譯成war的調度控制臺。

2. 引入源代碼Demo開發演示樣例

當前演示樣例與Spring集成,源代碼可作為普通project依賴入任務project,也可將其打包成jar並引入依賴,此處版本號為3.2.2.2

補充:若打包失敗,請檢查編譯插件版本號及jdk編譯版本號。

任務project依賴

<dependency>
    <groupId>com.taobao.pamirs.schedule</groupId>
    <artifactId>tbschedule</artifactId>
    <version>3.3.3.2</version>
</dependency>

調度任務實現IScheduleTaskDealSingle。並實現selectTasksexecute方法,具體演示樣例:

Component("iScheduleTaskDealSingleTest")
public class IScheduleTaskDealSingleTest implements IScheduleTaskDealSingle<TaskModel> {

    private static final Logger LOG = LoggerFactory.getLogger(IScheduleTaskDealSingleTest.class);

    @Override
    public Comparator<TaskModel> getComparator() {
        return null;
    }

    @Override
    public List<TaskModel> selectTasks(String taskParameter, String ownSign, int taskQueueNum,
            List<TaskItemDefine> taskItemList, int eachFetchDataNum) throws Exception {

        LOG.info("IScheduleTaskDealSingleTest配置的參數,taskParameter:{}。ownSina:{}。taskQueueNum:{},taskItemList:{}, eachFetchDataNum:{}", taskParameter, ownSign, taskQueueNum, taskItemList, eachFetchDataNum);

        LOG.info("IScheduleTaskDealSingleTest選擇任務列表開始啦..........");
        List<TaskModel> models = new ArrayList<TaskModel>();
        models.add(new TaskModel(String.valueOf(System.currentTimeMillis()), "taosirTest1"));
        models.add(new TaskModel(String.valueOf(System.currentTimeMillis()), "taosirTest2"));

        return models;

    }

    @Override
    public boolean execute(TaskModel model, String ownSign) throws Exception {

        LOG.info("IScheduleTaskDealSingleTest運行開始啦.........." + new Date());
        System.out.println(model);
        return true;

    }

}

當中selectTasks方法獲取須要處理的列表(用集合裝著),循環集合中的元素並調用execute方法運行。子計時任務啟動,會直到獲取不到數據後才停止等待下一個子計時開始,參數後面具體介紹。

將調度任務註冊到zookeeper中心,spring中引入例如以下配置:

<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans default-autowire="byName">
    <bean id="scheduleManagerFactory"
        class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
        init-method="init">
        <property name="zkConfig">
            <map>
                <entry key="zkConnectString" value="${schedule.zookeeper.address}" />
                <entry key="rootPath" value="${schedule.root.catalog}" />
                <entry key="zkSessionTimeout" value="${schedule.timeout}" />
                <entry key="userName" value="${schedule.username}" />
                <entry key="password" value="${schedule.password}" />
                <entry key="isCheckParentPath" value="true" />
            </map>
        </property>
    </bean>
</beans>

環境屬性配置文件添加例如以下配置:

#註冊中心地址
schedule.zookeeper.address=172.26.50.86:2181
#定時任務根文件夾。隨意指定,調度控制臺配置時相應
schedule.root.catalog=/tbschedule/example
#賬戶,隨意指定。調度控制臺配置時相應
schedule.username=username
#密碼,隨意指定,調度控制臺配置時相應
schedule.password=password
#超時配置
schedule.timeout=60000

啟動容器,iScheduleTaskDealSingleTest就完畢了到zookeeper中心的註冊。

補充:TBSchedule提供了IScheduleTaskDealSingle和IScheduleTaskDealMulti兩個接口,個人在測試中發現兩者除了execute方法上參數不同外,功能上並沒有別的不同,僅僅是語義上的區分,在處理模式為SLEEP下getComparator()沒實用。普通情況下,都是SLEEP模式。

3. 控制臺配置任務調度

將控制臺ScheduleConsole.war部署到tomcat容器。

補充:我通過ant運行源代碼中的build.xml構建控制臺。部署運行失敗(沒用過ant,眼下不知道原因),這樣的情況下:

使用方式一,直接用下載包中的控制臺部署就可以。


使用方式二,改動project配置打成war包,這靈活,還能夠自己定義改動,源文件不支持中文,可將編碼改成utf-8支持。

向註冊中心註冊配置(跟任務註冊用同一根文件夾,官方wiki圖示)
http://{server}:{port}/ScheduleConsole/schedule/config.jsp

配置調度任務(官方wiki圖示)
http://{server}:{port}/ScheduleConsole/schedule/index.jsp

4. selectTasks方法參數說明

taskParameter:相應控制臺自己定義參數,可自己定義傳入做邏輯上的操作

taskQueueNum:相應控制臺任務項數量

taskItemList:集合中TaskItemDefine的id值相應任務項值,多線程處理時,依據任務項協調數據一致性和完整性

eachFetchDataNum:相應控制臺每次獲取數量,因為子計時單元開始後,會不斷的去取數據進行處理,直到取不到數據子計時才停止,等待下一個子計時開始。

能夠限制每次取數。防止一次性數據記錄過大,內存不足。

ownSign:環境參數,一般沒什麽用

5. 創建調度策略參數說明

策略名稱:策略標示,可隨意填寫

任務類型:一般保持默認Schedule

任務名稱:相應任務欄被調度任務名稱

任務參數:一般不用,保持默認

單JVM最大線程組數量:單個JVM同意開啟的線程數

最大線程組數量:多處理機情況下的線程總數限制(總線程為2。任務項線程為4是沒有意義的)

IP地址127.0.0.1或者localhost會在全部機器上運行,註意多處理機若沒有依據任務子項劃分數據處理,會導致多處理機反復處理數據,慎重配置

創建演示樣例。官方wiki上有圖示。上面主要是各參數的具體含義。

6. 創建任務參數說明

任務名稱:策略調度的標示。一旦創建保存,不可更改

任務處理的SpringBean:註冊到spring的任務bean,如iScheduleTaskDealSingleTest

心跳頻率/假定服務死亡時間/處理模式/沒有數據時休眠時長/運行結束時間:一般保持默認就可以

線程數:處理該任務的線程數,在沒有劃分多任務項的情況下,多線程是沒有意義的,且線程數量大於任務項也是沒有意義的(線程數小於等於任務項)。註意假設開啟多線程,必須對數據做任務項過濾

單線程組最大任務項:配置單JVM處理的最大任務項數量,多任務項情況下。可按需限制,一般默認,多運行機會均衡分配

每次獲取數量:子計時單元開始。線程會不斷的去獲取數據(每次獲取的限制)並處理數據,直到獲取不到數據子計時才結束(方法內不用就能夠隨意配置)

每次運行數量://還沒測試過(可能是將獲取的數量拆分多次運行)

每次處理完休眠時間:子計時單元開始,僅僅要有數據,就會不停的獲取不停的處理,這個時間設置後,子計時單元開始每次獲取運行後。無論還有沒有待數據,都先歇會兒再獲取處理

自己定義參數:可自己定義控制任務邏輯操作

任務項:這項非常重要。在多線程情況下,劃分任務項是有意義的,可是要註意必須通過任務項參數,協調待處理數據。否則多線程會反復處理

創建演示樣例,官方wiki上有圖示。上面主要是各參數的具體含義。

分布式開源調度框架TBSchedule原理與應用