分散式開源排程框架TBSchedule原理與應用
主要內容:
第一部分 TBSchedule基本概念及原理
1. 概念介紹
2. 工作原理
3. 原始碼分析
4. 與其他開源排程框架對比
第二部分 TBSchedule分散式排程示例
1. TBSchedule原始碼下載
2. 引入原始碼Demo開發示例
3. 控制檯配置任務排程
4. selectTasks方法引數說明
5. 建立排程策略引數說明
6. 建立任務引數說明
第一部分 TBSchedule基本概念及原理
1. 概念介紹
TBSchedule是一個支援分散式的排程框架,能讓一種批量任務或者不斷變化的任務,被動態的分配到多個主機的JVM中,不同的執行緒組中並行執行。基於ZooKeeper的純Java實現,由Alibaba開源。
2. 工作原理
TBSchesule對分散式的支援包括排程機的分散式和執行機的分散式,其網路部署架構圖如下:
2.1 資料儲存
執行機和排程機均以ZooKeeper為註冊中心,所有資料以節點及節點內容的形式註冊,通過定時彙報主機狀態保持存活在ZooKeeper上。
首先看下執行機對ZooKeeper的使用配置檔案:
<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans default-autowire="byName">
<bean id="scheduleManagerFactory"
class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
init-method="init">
<property name="zkConfig">
<map>
<entry key="zkConnectString" value="${schedule.zookeeper.address}" />
<entry key="rootPath" value="${schedule.root.catalog}" />
<entry key="zkSessionTimeout" value="${schedule.timeout}" />
<entry key="userName" value="${schedule.username}" />
<entry key="password" value="${schedule.password}" />
<entry key="isCheckParentPath" value="true" />
</map>
</property>
</bean>
</beans>
1)執行機部署啟動,會在ZooKeeper上建立永久根節點schedule.zookeeper.address
,其後所有的操作均在該根節點下進行。
這裡以/ttest/creditjob
為根節點,檢視執行機註冊後情況:
[zk: 172.26.50.86:2181(CONNECTED) 28] ls /ttest/creditjob
[strategy, baseTaskType, factory]
[zk: 172.26.50.86:2181(CONNECTED) 29] ls /ttest/creditjob/factory
[127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000, 127.0.0.1$MIE-ZHANGTAO-D1$D826BC6565DC4D6CB85F7AE321EE51AE$0000000001]
可以看到根節點下面有3個永久子節點,strategy
儲存排程機建立的策略資訊,baseTaskType
儲存排程機建立的任務資訊,factory
儲存執行機註冊的主機資訊。每臺執行機啟動後,都會在factory
下建立一個臨時順序子節點
,該節點名是由TBSchedule原始碼生成的主機唯一表示。
根節點內容為當前TBSchedule內建版本號,可在程式修改,實際沒什麼意義。
[zk: 172.26.50.86:2181(CONNECTED) 17] get /ttest/creditjob
tbschedule-3.2.12
2)排程機部署啟動,這時不會對ZooKeeper節點做任何操作。開啟排程機配置面板:
配置好ZooKeeper接入點,點選管理主頁,進入排程任務管理面板:
輸入各項引數建立新任務後,此時會在baseTaskType
下面建立任務名稱永久子節點(排程機所有都宕機重啟後,仍能保持資料的完整性),而當前節點的內容就是配置的各項引數。
[zk: 172.26.50.86:2181(CONNECTED) 37] ls /ttest/creditjob/baseTaskType
[IScheduleTaskDealSingleTest]
[zk: 172.26.50.86:2181(CONNECTED) 39] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest
{"baseTaskType":"IScheduleTaskDealSingleTest","heartBeatRate":5000,"judgeDeadInterval":60000,"sleepTimeNoData":500,"sleepTimeInterval":0,"fetchDataNumber":500,"executeNumber":1,"threadNumber":1,"processorType":"SLEEP","permitRunStartTime":"0 * * * * ?","expireOwnSignInterval":1.0,"dealBeanName":"iScheduleTaskDealSingleTest","taskParameter":"0","taskKind":"static","taskItems":["0"],"maxTaskItemsOfOneThreadGroup":0,"version":0,"sts":"resume"}
3)建立排程策略,控制排程機排程狀態。
建立完成排程策略後開啟排程,此過程會在對應的任務節點strategy
下建立永久子節點
並寫入策略資料,在該子節點下建立表示排程機的臨時順序子節點
並寫入排程策略資料。
[zk: 172.26.50.86:2181(CONNECTED) 56] get /ttest/creditjob/strategy/IScheduleTaskDealSingleTest
{"strategyName":"IScheduleTaskDealSingleTest","IPList":["127.0.0.1"],"numOfSingleServer":0,"assignNum":1,"kind":"Schedule","taskName":"IScheduleTaskDealSingleTest","taskParameter":"0","sts":"resume"}
[zk: 172.26.50.86:2181(CONNECTED) 57] get /ttest/creditjob/strategy/IScheduleTaskDealSingleTest/127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000
{"strategyName":"IScheduleTaskDealSingleTest","uuid":"127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000","requestNum":1,"currentNum":0,"message":""}
同時會在baseTaskType/IScheduleTaskDealSingleTest
下建立下建立兩層永久子節點
並註冊排程主機資料。
[zk: 172.26.50.86:2181(CONNECTED) 45] ls /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest
[taskItem, server]
[zk: 172.26.50.86:2181(CONNECTED) 50] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest/taskItem
IScheduleTaskDealSingleTest$127.0.0.1$4E8008EE18334564BE1E31C7C0D55296$0000000000,IScheduleTaskDealSingleTest$127.0.0.1$4E8008EE18334564BE1E31C7C0D55296$0000000001
[zk: 172.26.50.86:2181(CONNECTED) 51] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest/server
reload=true
2.2 分散式高可用高效率保障
1)排程機的高可用有保障
,多排程機向註冊中心註冊後,共享排程任務,且同一排程任務僅由一臺排程機執行排程,當前排程機異常宕機後,其餘的排程機會接上。
2)執行機的高可用有保障
,多執行機向註冊中心註冊後,配置執行機單執行緒(多機匯流排程為1)執行任務,排程機會隨機啟動一臺執行機執行,當前執行異常機宕機後,排程機會會新排程一臺執行機。
3)執行機的並行高效保障
,配置執行機多執行緒且劃分多工子項後,各任務子項均衡分配到所有執行機,各執行機均執行,多執行緒資料一致性協調由任務項引數區分。
4)彈性擴充套件失效轉移保障
,執行中的執行機宕機,或新增執行機,排程機將在下次任務執行前重新分配任務項,不影響正常執行機任務(崩潰的執行機當前任務處理失效);執行中的排程機宕機或動態新增排程機,不影響執行機當前任務,排程機宕機後動態切換。
3. 原始碼分析
3.1 執行機註冊節選
從Spring配置檔案可以看到,執行機註冊的入口在TBScheduleManagerFactory
的init
方法,程式碼片段:
public class TBScheduleManagerFactory implements ApplicationContextAware {
public void init() throws Exception {
Properties properties = new Properties();
for(Map.Entry<String,String> e: this.zkConfig.entrySet()){
properties.put(e.getKey(),e.getValue());
}
this.init(properties);
}
public void init(Properties p) throws Exception {
if(this.initialThread != null){
this.initialThread.stopThread();
}
this.lock.lock();
try{
this.scheduleDataManager = null;
this.scheduleStrategyManager = null;
ConsoleManager.setScheduleManagerFactory(this);
if(this.zkManager != null){
this.zkManager.close();
}
this.zkManager = new ZKManager(p);
this.errorMessage = "Zookeeper connecting ......" + this.zkManager.getConnectStr();
initialThread = new InitialThread(this);
initialThread.setName("TBScheduleManagerFactory-initialThread");
initialThread.start();
}finally{
this.lock.unlock();
}
}
}
init
方法將配置引數封裝到Properties
物件後開始初始化,連線上ZooKeeper並啟動一個新的執行緒進行節點資料處理。
this.zkManager = new ZKManager(p);
...
initialThread = new InitialThread(this);
initialThread.start();
跟蹤程式碼可以看到新執行緒呼叫的實際處理方法是:
public void initialData() throws Exception{
/** 遞迴建立永久根節點(/ttest/creditjob)並寫入版本資訊 */
this.zkManager.initial();
/** 建立永久子節點 baseTaskType */
this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);
/** 建立永久子節點 strategy 和 factory */
this.scheduleStrategyManager = new ScheduleStrategyDataManager4ZK(this.zkManager);
if (this.start == true) {
/** 註冊排程管理器,建立臨時順序子節點,節點表示主機的註冊資訊 */
this.scheduleStrategyManager.registerManagerFactory(this);
if(timer == null){
timer = new Timer("TBScheduleManagerFactory-Timer");
}
if(timerTask == null){
/** 啟動一個定時器檢測ZooKeeper狀態,如果連線失敗,關閉所有的任務後,重新連線Zookeeper伺服器 */
timerTask = new ManagerFactoryTimerTask(this);
timer.schedule(timerTask, 2000,this.timerInterval);
}
}
}
上述幾個節點建立完成,並向ZooKeeper註冊監聽,當有資料變化時獲得通知(任務執行/暫停)。到這裡,就完成了執行機到ZooKeeper的註冊監聽過程。
3.2 排程任務建立節選
任務建立提交儲存為入口,將引數封裝到ScheduleTaskType
物件中,呼叫節點建立和更新方法:
//taskTypeEdit.jsp->taskTypeDeal.jsp
if(action.equalsIgnoreCase("createTaskType")){
ConsoleManager.getScheduleDataManager().createBaseTaskType(taskType);
result = "任務" + baseTaskType + "建立成功!!!!";
}else{
ConsoleManager.getScheduleDataManager().updateBaseTaskType(taskType);
result = "任務" + baseTaskType + "修改成功!!!!";
}
真正執行任務節點及資料處理的程式碼段:
//ScheduleDataManager4ZK.java
public void createBaseTaskType(ScheduleTaskType baseTaskType) throws Exception {
if(baseTaskType.getBaseTaskType().indexOf("$") > 0){
throw new Exception("排程任務" + baseTaskType.getBaseTaskType() +"名稱不能包括特殊字元 $");
}
/** 在 baseTaskType 節點下建立任務永久節點並寫入節點內容為任務配置引數 */
String zkPath = this.PATH_BaseTaskType + "/"+ baseTaskType.getBaseTaskType();
String valueString = this.gson.toJson(baseTaskType);
if ( this.getZooKeeper().exists(zkPath, false) == null) {
this.getZooKeeper().create(zkPath, valueString.getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
} else {
throw new Exception("排程任務" + baseTaskType.getBaseTaskType() + "已經存在,如果確認需要重建,請先呼叫deleteTaskType(String baseTaskType)刪除");
}
}
如果是更新的話,就不會再建立任務永久節點了,直接修改任務節點內容即可。
3.3 策略建立節選
策略建立提交儲存為入口,將引數封裝到ScheduleStrategy
物件中,呼叫節點建立和更新方法:
//scheduleStrategyEdit.jsp->scheduleStrategyDeal.jsp
if (action.equalsIgnoreCase("createScheduleStrategy")) {
ConsoleManager.getScheduleStrategyManager().createScheduleStrategy(scheduleStrategy);
isRefreshParent = true;
} else if (action.equalsIgnoreCase("editScheduleStrategy")) {
ConsoleManager.getScheduleStrategyManager().updateScheduleStrategy(scheduleStrategy);
isRefreshParent = true;
}
真正執行任務節點及資料處理的程式碼段:
//ScheduleStrategyDataManager4ZK.java
public void createScheduleStrategy(ScheduleStrategy scheduleStrategy) throws Exception {
String zkPath = this.PATH_Strategy + "/"+ scheduleStrategy.getStrategyName();
/** 在 strategy 節點下建立任務永久節點並寫入節點內容為任務配置引數 */
String valueString = this.gson.toJson(scheduleStrategy);
if ( this.getZooKeeper().exists(zkPath, false) == null) {
this.getZooKeeper().create(zkPath, valueString.getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
} else {
throw new Exception("排程策略" + scheduleStrategy.getStrategyName() + "已經存在,如果確認需要重建,請先呼叫deleteMachineStrategy(String taskType)刪除");
}
}
如果是更新的話,就不會再建立任務永久節點了,直接修改任務節點內容即可。
3.4 排程控制節選
策略排程控制程式碼段:
//scheduleStrategyList.jsp->scheduleStrategyDeal.jsp
else if (action.equalsIgnoreCase("deleteScheduleStrategy")) {
ConsoleManager.getScheduleStrategyManager()
.deleteMachineStrategy(
scheduleStrategy.getStrategyName());
isRefreshParent = true;
} else if (action.equalsIgnoreCase("pauseTaskType")) {
ConsoleManager.getScheduleStrategyManager().pause(
scheduleStrategy.getStrategyName());
isRefreshParent = true;
} else if (action.equalsIgnoreCase("resumeTaskType")) {
ConsoleManager.getScheduleStrategyManager().resume(
scheduleStrategy.getStrategyName());
isRefreshParent = true;
}
真正執行任務節點及資料處理的程式碼段:
//ScheduleStrategyDataManager4ZK.java
/** 策略刪除,即刪除strategy下對應的策略節點及資料 */
public void deleteMachineStrategy(String taskType) throws Exception {
deleteMachineStrategy(taskType,false);
}
/** 排程暫停,即修改strategy下對應的策略節點的狀態標示資料 */
public void pause(String strategyName) throws Exception{
ScheduleStrategy strategy = this.loadStrategy(strategyName);
strategy.setSts(ScheduleStrategy.STS_PAUSE);
this.updateScheduleStrategy(strategy);
}
/** 排程啟動,即修改strategy下對應的策略節點的狀態標示資料 */
public void resume(String strategyName) throws Exception{
ScheduleStrategy strategy = this.loadStrategy(strategyName);
strategy.setSts(ScheduleStrategy.STS_RESUME);
this.updateScheduleStrategy(strategy);
}
修改節點資料,通過ZooKeeper的事件通知機制,讓執行機獲得變更通知。
4. 與其他開源排程框架對比
1)Quartz
:Java事實上的定時任務標準。但Quartz關注點在於定時任務而非資料,並無一套根據資料處理而定製化的流程。雖然Quartz可以基於資料庫實現作業的高可用,缺少分散式並行執行作業的功能。
2)Crontab
:Linux系統級的定時任務執行器,缺乏分散式和集中管理功能。
3)elastic-job
:噹噹網最近開源專案,功能跟TBSchedule幾乎一樣(批鬥TBSchedule文件缺失嚴重),一臺伺服器只能開啟一個任務例項,基於Ip不基於IpPort,單機難除錯叢集功能。
4)TBSchedule
:淘寶早期開源,穩定性可以保證。
第二部分 TBSchedule分散式排程示例
1. TBSchedule原始碼下載
下載TBSchedule原始碼工程,內容包括兩部分:工程編譯成jar
的任務開發依賴包和工程編譯成war
的排程控制檯。
2. 引入原始碼Demo開發示例
當前示例與Spring整合,原始碼可作為普通工程依賴入任務工程,也可將其打包成jar
並引入依賴,此處版本為3.2.2.2
。
補充
:若打包失敗,請檢查編譯外掛版本及jdk編譯版本。
任務工程依賴
<dependency>
<groupId>com.taobao.pamirs.schedule</groupId>
<artifactId>tbschedule</artifactId>
<version>3.3.3.2</version>
</dependency>
排程任務實現IScheduleTaskDealSingle
,並實現selectTasks
,execute
方法,詳細示例:
Component("iScheduleTaskDealSingleTest")
public class IScheduleTaskDealSingleTest implements IScheduleTaskDealSingle<TaskModel> {
private static final Logger LOG = LoggerFactory.getLogger(IScheduleTaskDealSingleTest.class);
@Override
public Comparator<TaskModel> getComparator() {
return null;
}
@Override
public List<TaskModel> selectTasks(String taskParameter, String ownSign, int taskQueueNum,
List<TaskItemDefine> taskItemList, int eachFetchDataNum) throws Exception {
LOG.info("IScheduleTaskDealSingleTest配置的引數,taskParameter:{},ownSina:{},taskQueueNum:{},taskItemList:{}, eachFetchDataNum:{}", taskParameter, ownSign, taskQueueNum, taskItemList, eachFetchDataNum);
LOG.info("IScheduleTaskDealSingleTest選擇任務列表開始啦..........");
List<TaskModel> models = new ArrayList<TaskModel>();
models.add(new TaskModel(String.valueOf(System.currentTimeMillis()), "taosirTest1"));
models.add(new TaskModel(String.valueOf(System.currentTimeMillis()), "taosirTest2"));
return models;
}
@Override
public boolean execute(TaskModel model, String ownSign) throws Exception {
LOG.info("IScheduleTaskDealSingleTest執行開始啦.........." + new Date());
System.out.println(model);
return true;
}
}
其中selectTasks
方法獲取需要處理的列表(用集合裝著),迴圈集合中的元素並呼叫execute
方法執行。子計時任務啟動,會直到獲取不到資料後才停止等待下一個子計時開始,引數後面詳細介紹。
將排程任務註冊到zookeeper
中心,spring
中引入如下配置:
<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans default-autowire="byName">
<bean id="scheduleManagerFactory"
class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
init-method="init">
<property name="zkConfig">
<map>
<entry key="zkConnectString" value="${schedule.zookeeper.address}" />
<entry key="rootPath" value="${schedule.root.catalog}" />
<entry key="zkSessionTimeout" value="${schedule.timeout}" />
<entry key="userName" value="${schedule.username}" />
<entry key="password" value="${schedule.password}" />
<entry key="isCheckParentPath" value="true" />
</map>
</property>
</bean>
</beans>
環境屬性配置檔案增加如下配置:
#註冊中心地址
schedule.zookeeper.address=172.26.50.86:2181
#定時任務根目錄,任意指定,排程控制檯配置時對應
schedule.root.catalog=/tbschedule/example
#賬戶,任意指定,排程控制檯配置時對應
schedule.username=username
#密碼,任意指定,排程控制檯配置時對應
schedule.password=password
#超時配置
schedule.timeout=60000
啟動容器,iScheduleTaskDealSingleTest
就完成了到zookeeper
中心的註冊。
補充
:TBSchedule提供了IScheduleTaskDealSingle和IScheduleTaskDealMulti兩個介面,個人在測試中發現兩者除了execute方法上引數不同外,功能上並沒有別的不同,只是語義上的區分,在處理模式為SLEEP下getComparator()沒有用,一般情況下,都是SLEEP模式。
3. 控制檯配置任務排程
將控制檯ScheduleConsole.war
部署到tomcat
容器。
補充
:我通過ant
執行原始碼中的build.xml
構建控制檯,部署執行失敗(沒用過ant,目前不知道原因),這種情況下:使用方式一,直接用下載包中的控制檯部署即可。
使用方式二,修改工程配置打成war
包,這靈活,還可以自定義修改,原始檔不支援中文,可將編碼改成utf-8
支援。
向註冊中心註冊配置(跟任務註冊用同一根目錄,官方wiki圖示)
http://{server}:{port}/ScheduleConsole/schedule/config.jsp
配置排程任務(官方wiki圖示)
http://{server}:{port}/ScheduleConsole/schedule/index.jsp
4. selectTasks方法引數說明
taskParameter
:對應控制檯自定義引數
,可自定義傳入做邏輯上的操作
taskQueueNum
:對應控制檯任務項
數量
taskItemList
:集合中TaskItemDefine
的id值對應任務項
值,多執行緒處理時,根據任務項協調資料一致性和完整性
eachFetchDataNum
:對應控制檯每次獲取數量
,由於子計時單元開始後,會不斷的去取資料進行處理,直到取不到資料子計時才停止,等待下一個子計時開始。可以限制每次取數,防止一次性資料記錄過大,記憶體不足。
ownSign
:環境引數,一般沒什麼用
5. 建立排程策略引數說明
策略名稱
:策略標示,可任意填寫
任務型別
:一般保持預設Schedule
任務名稱
:對應工作列被排程任務名稱
任務引數
:一般不用,保持預設
單JVM最大執行緒組數量
:單個JVM允許開啟的執行緒數
最大執行緒組數量
:多處理機情況下的執行緒總數限制(匯流排程為2,任務項執行緒為4是沒有意義的)
IP地址
:127.0.0.1
或者localhost
會在所有機器上執行,注意多處理機若沒有根據任務子項劃分資料處理,會導致多處理機重複處理資料,謹慎配置
建立示例,官方wiki上有圖示,上面主要是各引數的具體含義。
6. 建立任務引數說明
任務名稱
:策略排程的標示,一旦建立儲存,不可更改
任務處理的SpringBean
:註冊到spring的任務bean,如iScheduleTaskDealSingleTest
心跳頻率
/假定服務死亡時間
/處理模式
/沒有資料時休眠時長
/執行結束時間
:一般保持預設即可
執行緒數
:處理該任務的執行緒數,在沒有劃分多任務項
的情況下,多執行緒是沒有意義的,且執行緒數量大於任務項
也是沒有意義的(執行緒數小於等於任務項),注意如果開啟多執行緒,必須對資料做任務項過濾
單執行緒組最大任務項
:配置單JVM處理的最大任務項數量,多任務項
情況下,可按需限制,一般預設,多執行機會均衡分配
每次獲取數量
:子計時單元開始,執行緒會不斷的去獲取資料(每次獲取的限制)並處理資料,直到獲取不到資料子計時才結束(方法內不用就可以隨意配置)
每次執行數量
://還沒測試過(可能是將獲取的數量拆分多次執行)
每次處理完休眠時間
:子計時單元開始,只要有資料,就會不停的獲取不停的處理,這個時間設定後,子計時單元開始每次獲取執行後,不管還有沒有待資料,都先歇會兒再獲取處理
自定義引數
:可自定義控制任務邏輯操作
任務項
:這項很重要,在多執行緒情況下,劃分任務項是有意義的,但是要注意必須通過任務項引數,協調待處理資料,否則多執行緒會重複處理
建立示例,官方wiki上有圖示,上面主要是各引數的具體含義。