分布式開源調度框架TBSchedule原理與應用
主要內容:
第一部分 TBSchedule基本概念及原理
1. 概念介紹
2. 工作原理
3. 源代碼分析
4. 與其它開源調度框架對照
第二部分 TBSchedule分布式調度演示樣例
1. TBSchedule源代碼下載
2. 引入源代碼Demo開發演示樣例
3. 控制臺配置任務調度
4. selectTasks方法參數說明
5. 創建調度策略參數說明
6. 創建任務參數說明
第一部分 TBSchedule基本概念及原理
1. 概念介紹
TBSchedule是一個支持分布式的調度框架。能讓一種批量任務或者不斷變化的任務,被動態的分配到多個主機的JVM中,不同的線程組中並行運行。基於ZooKeeper的純Java實現,由Alibaba開源。
2. 工作原理
TBSchesule對分布式的支持包含調度機的分布式和運行機的分布式,其網絡部署架構圖例如以下:
2.1 數據存儲
運行機和調度機均以ZooKeeper為註冊中心,全部數據以節點及節點內容的形式註冊,通過定時匯報主機狀態保持存活在ZooKeeper上。
首先看下運行機對ZooKeeper的使用配置文件:
<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans default-autowire="byName">
<bean id="scheduleManagerFactory"
class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
init-method="init">
<property name="zkConfig">
<map>
<entry key="zkConnectString" value="${schedule.zookeeper.address}" />
<entry key="rootPath" value="${schedule.root.catalog}" />
<entry key="zkSessionTimeout" value="${schedule.timeout}" />
<entry key="userName" value="${schedule.username}" />
<entry key="password" value="${schedule.password}" />
<entry key="isCheckParentPath" value="true" />
</map>
</property>
</bean>
</beans>
1)運行機部署啟動,會在ZooKeeper上創建永久根節點schedule.zookeeper.address
,其後全部的操作均在該根節點下進行。
這裏以/ttest/creditjob
為根節點。查看運行機註冊後情況:
[zk: 172.26.50.86:2181(CONNECTED) 28] ls /ttest/creditjob
[strategy, baseTaskType, factory]
[zk: 172.26.50.86:2181(CONNECTED) 29] ls /ttest/creditjob/factory
[127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000, 127.0.0.1$MIE-ZHANGTAO-D1$D826BC6565DC4D6CB85F7AE321EE51AE$0000000001]
能夠看到根節點以下有3個永久子節點,strategy
存儲調度機創建的策略信息,baseTaskType
存儲調度機創建的任務信息,factory
存儲運行機註冊的主機信息。每臺運行機啟動後。都會在factory
下創建一個暫時順序子節點
,該節點名是由TBSchedule源代碼生成的主機唯一表示。
根節點內容為當前TBSchedule內置版本號號。可在程序改動,實際沒什麽意義。
[zk: 172.26.50.86:2181(CONNECTED) 17] get /ttest/creditjob
tbschedule-3.2.12
2)調度機部署啟動,這時不會對ZooKeeper節點做不論什麽操作。打開調度機配置面板:
配置好ZooKeeper接入點,點擊管理主頁。進入調度任務管理面板:
輸入各項參數創建新任務後,此時會在baseTaskType
以下創建任務名稱永久子節點(調度機全部都宕機重新啟動後。仍能保持數據的完整性)。而當前節點的內容就是配置的各項參數。
[zk: 172.26.50.86:2181(CONNECTED) 37] ls /ttest/creditjob/baseTaskType
[IScheduleTaskDealSingleTest]
[zk: 172.26.50.86:2181(CONNECTED) 39] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest
{"baseTaskType":"IScheduleTaskDealSingleTest","heartBeatRate":5000,"judgeDeadInterval":60000,"sleepTimeNoData":500,"sleepTimeInterval":0,"fetchDataNumber":500,"executeNumber":1,"threadNumber":1,"processorType":"SLEEP","permitRunStartTime":"0 * * * * ?","expireOwnSignInterval":1.0,"dealBeanName":"iScheduleTaskDealSingleTest","taskParameter":"0","taskKind":"static","taskItems":["0"],"maxTaskItemsOfOneThreadGroup":0,"version":0,"sts":"resume"}
3)創建調度策略,控制調度機調度狀態。
創建完畢調度策略後開啟調度,此過程會在相應的任務節點strategy
下創建永久子節點
並寫入策略數據。在該子節點下創建表示調度機的暫時順序子節點
並寫入調度策略數據。
[zk: 172.26.50.86:2181(CONNECTED) 56] get /ttest/creditjob/strategy/IScheduleTaskDealSingleTest
{"strategyName":"IScheduleTaskDealSingleTest","IPList":["127.0.0.1"],"numOfSingleServer":0,"assignNum":1,"kind":"Schedule","taskName":"IScheduleTaskDealSingleTest","taskParameter":"0","sts":"resume"}
[zk: 172.26.50.86:2181(CONNECTED) 57] get /ttest/creditjob/strategy/IScheduleTaskDealSingleTest/127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000
{"strategyName":"IScheduleTaskDealSingleTest","uuid":"127.0.0.1$MIE-ZHANGTAO-D1$9D3029EC0C574403B6CFD0C146644A77$0000000000","requestNum":1,"currentNum":0,"message":""}
同一時候會在baseTaskType/IScheduleTaskDealSingleTest
下創建下創建兩層永久子節點
並註冊調度主機數據。
[zk: 172.26.50.86:2181(CONNECTED) 45] ls /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest
[taskItem, server]
[zk: 172.26.50.86:2181(CONNECTED) 50] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest/taskItem
IScheduleTaskDealSingleTest$127.0.0.1$4E8008EE18334564BE1E31C7C0D55296$0000000000,IScheduleTaskDealSingleTest$127.0.0.1$4E8008EE18334564BE1E31C7C0D55296$0000000001
[zk: 172.26.50.86:2181(CONNECTED) 51] get /ttest/creditjob/baseTaskType/IScheduleTaskDealSingleTest/IScheduleTaskDealSingleTest/server
reload=true
2.2 分布式高可用高效率保障
1)調度機的高可用有保障
,多調度機向註冊中心註冊後,共享調度任務。且同一調度任務僅由一臺調度機運行調度。當前調度機異常宕機後。其余的調度機會接上。
2)運行機的高可用有保障
,多運行機向註冊中心註冊後,配置運行機單線程(多機總線程為1)運行任務。調度機會隨機啟動一臺運行機運行。當前運行異常機宕機後。調度機會會新調度一臺運行機。
3)運行機的並行高效保障
,配置運行機多線程且劃分多任務子項後,各任務子項均衡分配到全部運行機,各運行機均運行,多線程數據一致性協調由任務項參數區分。
4)彈性擴展失效轉移保障
,運行中的運行機宕機,或新增運行機。調度機將在下次任務運行前又一次分配任務項,不影響正常運行機任務(崩潰的運行機當前任務處理失效)。運行中的調度機宕機或動態新增調度機,不影響運行機當前任務,調度機宕機後動態切換。
3. 源代碼分析
3.1 運行機註冊節選
從Spring配置文件能夠看到,運行機註冊的入口在TBScheduleManagerFactory
的init
方法,代碼片段:
public class TBScheduleManagerFactory implements ApplicationContextAware {
public void init() throws Exception {
Properties properties = new Properties();
for(Map.Entry<String,String> e: this.zkConfig.entrySet()){
properties.put(e.getKey(),e.getValue());
}
this.init(properties);
}
public void init(Properties p) throws Exception {
if(this.initialThread != null){
this.initialThread.stopThread();
}
this.lock.lock();
try{
this.scheduleDataManager = null;
this.scheduleStrategyManager = null;
ConsoleManager.setScheduleManagerFactory(this);
if(this.zkManager != null){
this.zkManager.close();
}
this.zkManager = new ZKManager(p);
this.errorMessage = "Zookeeper connecting ......" + this.zkManager.getConnectStr();
initialThread = new InitialThread(this);
initialThread.setName("TBScheduleManagerFactory-initialThread");
initialThread.start();
}finally{
this.lock.unlock();
}
}
}
init
方法將配置參數封裝到Properties
對象後開始初始化,連接上ZooKeeper並啟動一個新的線程進行節點數據處理。
this.zkManager = new ZKManager(p);
...
initialThread = new InitialThread(this);
initialThread.start();
跟蹤代碼能夠看到新線程調用的實際處理方法是:
public void initialData() throws Exception{
/** 遞歸創建永久根節點(/ttest/creditjob)並寫入版本號信息 */
this.zkManager.initial();
/** 創建永久子節點 baseTaskType */
this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);
/** 創建永久子節點 strategy 和 factory */
this.scheduleStrategyManager = new ScheduleStrategyDataManager4ZK(this.zkManager);
if (this.start == true) {
/** 註冊調度管理器,創建暫時順序子節點。節點表示主機的註冊信息 */
this.scheduleStrategyManager.registerManagerFactory(this);
if(timer == null){
timer = new Timer("TBScheduleManagerFactory-Timer");
}
if(timerTask == null){
/** 啟動一個定時器檢測ZooKeeper狀態,假設連接失敗,關閉全部的任務後,又一次連接Zookeeper服務器 */
timerTask = new ManagerFactoryTimerTask(this);
timer.schedule(timerTask, 2000,this.timerInterval);
}
}
}
上述幾個節點創建完畢,並向ZooKeeper註冊監聽,當有數據變化時獲得通知(任務運行/暫停)。到這裏。就完畢了運行機到ZooKeeper的註冊監聽過程。
3.2 調度任務創建節選
任務創建提交保存為入口。將參數封裝到ScheduleTaskType
對象中,調用節點創建和更新方法:
//taskTypeEdit.jsp->taskTypeDeal.jsp
if(action.equalsIgnoreCase("createTaskType")){
ConsoleManager.getScheduleDataManager().createBaseTaskType(taskType);
result = "任務" + baseTaskType + "創建成功!!!
!
"
;
}else{
ConsoleManager.getScheduleDataManager().updateBaseTaskType(taskType);
result = "任務" + baseTaskType + "改動成功!!
!。"
;
}
真正運行任務節點及數據處理的代碼段:
//ScheduleDataManager4ZK.java
public void createBaseTaskType(ScheduleTaskType baseTaskType) throws Exception {
if(baseTaskType.getBaseTaskType().indexOf("$") > 0){
throw new Exception("調度任務" + baseTaskType.getBaseTaskType() +"名稱不能包含特殊字符 $");
}
/** 在 baseTaskType 節點下創建任務永久節點並寫入節點內容為任務配置參數 */
String zkPath = this.PATH_BaseTaskType + "/"+ baseTaskType.getBaseTaskType();
String valueString = this.gson.toJson(baseTaskType);
if ( this.getZooKeeper().exists(zkPath, false) == null) {
this.getZooKeeper().create(zkPath, valueString.getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
} else {
throw new Exception("調度任務" + baseTaskType.getBaseTaskType() + "已經存在,假設確認須要重建,請先調用deleteTaskType(String baseTaskType)刪除");
}
}
假設是更新的話,就不會再創建任務永久節點了,直接改動任務節點內容就可以。
3.3 策略創建節選
策略創建提交保存為入口,將參數封裝到ScheduleStrategy
對象中。調用節點創建和更新方法:
//scheduleStrategyEdit.jsp->scheduleStrategyDeal.jsp
if (action.equalsIgnoreCase("createScheduleStrategy")) {
ConsoleManager.getScheduleStrategyManager().createScheduleStrategy(scheduleStrategy);
isRefreshParent = true;
} else if (action.equalsIgnoreCase("editScheduleStrategy")) {
ConsoleManager.getScheduleStrategyManager().updateScheduleStrategy(scheduleStrategy);
isRefreshParent = true;
}
真正運行任務節點及數據處理的代碼段:
//ScheduleStrategyDataManager4ZK.java
public void createScheduleStrategy(ScheduleStrategy scheduleStrategy) throws Exception {
String zkPath = this.PATH_Strategy + "/"+ scheduleStrategy.getStrategyName();
/** 在 strategy 節點下創建任務永久節點並寫入節點內容為任務配置參數 */
String valueString = this.gson.toJson(scheduleStrategy);
if ( this.getZooKeeper().exists(zkPath, false) == null) {
this.getZooKeeper().create(zkPath, valueString.getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
} else {
throw new Exception("調度策略" + scheduleStrategy.getStrategyName() + "已經存在,假設確認須要重建,請先調用deleteMachineStrategy(String taskType)刪除");
}
}
假設是更新的話,就不會再創建任務永久節點了,直接改動任務節點內容就可以。
3.4 調度控制節選
策略調度控制代碼段:
//scheduleStrategyList.jsp->scheduleStrategyDeal.jsp
else if (action.equalsIgnoreCase("deleteScheduleStrategy")) {
ConsoleManager.getScheduleStrategyManager()
.deleteMachineStrategy(
scheduleStrategy.getStrategyName());
isRefreshParent = true;
} else if (action.equalsIgnoreCase("pauseTaskType")) {
ConsoleManager.getScheduleStrategyManager().pause(
scheduleStrategy.getStrategyName());
isRefreshParent = true;
} else if (action.equalsIgnoreCase("resumeTaskType")) {
ConsoleManager.getScheduleStrategyManager().resume(
scheduleStrategy.getStrategyName());
isRefreshParent = true;
}
真正運行任務節點及數據處理的代碼段:
//ScheduleStrategyDataManager4ZK.java
/** 策略刪除,即刪除strategy下相應的策略節點及數據 */
public void deleteMachineStrategy(String taskType) throws Exception {
deleteMachineStrategy(taskType,false);
}
/** 調度暫停,即改動strategy下相應的策略節點的狀態標示數據 */
public void pause(String strategyName) throws Exception{
ScheduleStrategy strategy = this.loadStrategy(strategyName);
strategy.setSts(ScheduleStrategy.STS_PAUSE);
this.updateScheduleStrategy(strategy);
}
/** 調度啟動,即改動strategy下相應的策略節點的狀態標示數據 */
public void resume(String strategyName) throws Exception{
ScheduleStrategy strategy = this.loadStrategy(strategyName);
strategy.setSts(ScheduleStrategy.STS_RESUME);
this.updateScheduleStrategy(strategy);
}
改動節點數據,通過ZooKeeper的事件通知機制,讓運行機獲得變更通知。
4. 與其它開源調度框架對照
1)Quartz
:Java其實的定時任務標準。
但Quartz關註點在於定時任務而非數據。並無一套依據數據處理而定制化的流程。
盡管Quartz能夠基於數據庫實現作業的高可用。缺少分布式並行運行作業的功能。
2)Crontab
:Linux系統級的定時任務運行器。缺乏分布式和集中管理功能。
3)elastic-job
:當當網近期開源項目,功能跟TBSchedule差點兒一樣(批鬥TBSchedule文檔缺失嚴重),一臺服務器僅僅能開啟一個任務實例,基於Ip不基於IpPort,單機難調試集群功能。
4)TBSchedule
:淘寶早期開源。穩定性能夠保證。
第二部分 TBSchedule分布式調度演示樣例
1. TBSchedule源代碼下載
下載TBSchedule源代碼project,內容包含兩部分:project編譯成jar
的任務開發依賴包和project編譯成war
的調度控制臺。
2. 引入源代碼Demo開發演示樣例
當前演示樣例與Spring集成,源代碼可作為普通project依賴入任務project,也可將其打包成jar
並引入依賴,此處版本號為3.2.2.2
。
補充
:若打包失敗,請檢查編譯插件版本號及jdk編譯版本號。
任務project依賴
<dependency>
<groupId>com.taobao.pamirs.schedule</groupId>
<artifactId>tbschedule</artifactId>
<version>3.3.3.2</version>
</dependency>
調度任務實現IScheduleTaskDealSingle
。並實現selectTasks
。execute
方法,具體演示樣例:
Component("iScheduleTaskDealSingleTest")
public class IScheduleTaskDealSingleTest implements IScheduleTaskDealSingle<TaskModel> {
private static final Logger LOG = LoggerFactory.getLogger(IScheduleTaskDealSingleTest.class);
@Override
public Comparator<TaskModel> getComparator() {
return null;
}
@Override
public List<TaskModel> selectTasks(String taskParameter, String ownSign, int taskQueueNum,
List<TaskItemDefine> taskItemList, int eachFetchDataNum) throws Exception {
LOG.info("IScheduleTaskDealSingleTest配置的參數,taskParameter:{}。ownSina:{}。taskQueueNum:{},taskItemList:{}, eachFetchDataNum:{}", taskParameter, ownSign, taskQueueNum, taskItemList, eachFetchDataNum);
LOG.info("IScheduleTaskDealSingleTest選擇任務列表開始啦..........");
List<TaskModel> models = new ArrayList<TaskModel>();
models.add(new TaskModel(String.valueOf(System.currentTimeMillis()), "taosirTest1"));
models.add(new TaskModel(String.valueOf(System.currentTimeMillis()), "taosirTest2"));
return models;
}
@Override
public boolean execute(TaskModel model, String ownSign) throws Exception {
LOG.info("IScheduleTaskDealSingleTest運行開始啦.........." + new Date());
System.out.println(model);
return true;
}
}
當中selectTasks
方法獲取須要處理的列表(用集合裝著),循環集合中的元素並調用execute
方法運行。子計時任務啟動,會直到獲取不到數據後才停止等待下一個子計時開始,參數後面具體介紹。
將調度任務註冊到zookeeper
中心,spring
中引入例如以下配置:
<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans default-autowire="byName">
<bean id="scheduleManagerFactory"
class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
init-method="init">
<property name="zkConfig">
<map>
<entry key="zkConnectString" value="${schedule.zookeeper.address}" />
<entry key="rootPath" value="${schedule.root.catalog}" />
<entry key="zkSessionTimeout" value="${schedule.timeout}" />
<entry key="userName" value="${schedule.username}" />
<entry key="password" value="${schedule.password}" />
<entry key="isCheckParentPath" value="true" />
</map>
</property>
</bean>
</beans>
環境屬性配置文件添加例如以下配置:
#註冊中心地址
schedule.zookeeper.address=172.26.50.86:2181
#定時任務根文件夾。隨意指定,調度控制臺配置時相應
schedule.root.catalog=/tbschedule/example
#賬戶,隨意指定。調度控制臺配置時相應
schedule.username=username
#密碼,隨意指定,調度控制臺配置時相應
schedule.password=password
#超時配置
schedule.timeout=60000
啟動容器,iScheduleTaskDealSingleTest
就完畢了到zookeeper
中心的註冊。
補充
:TBSchedule提供了IScheduleTaskDealSingle和IScheduleTaskDealMulti兩個接口,個人在測試中發現兩者除了execute方法上參數不同外,功能上並沒有別的不同,僅僅是語義上的區分,在處理模式為SLEEP下getComparator()沒實用。普通情況下,都是SLEEP模式。
3. 控制臺配置任務調度
將控制臺ScheduleConsole.war
部署到tomcat
容器。
補充
:我通過ant
運行源代碼中的build.xml
構建控制臺。部署運行失敗(沒用過ant,眼下不知道原因),這樣的情況下:使用方式一,直接用下載包中的控制臺部署就可以。
使用方式二,改動project配置打成war
包,這靈活,還能夠自己定義改動,源文件不支持中文,可將編碼改成utf-8
支持。
向註冊中心註冊配置(跟任務註冊用同一根文件夾,官方wiki圖示)
http://{server}:{port}/ScheduleConsole/schedule/config.jsp
配置調度任務(官方wiki圖示)
http://{server}:{port}/ScheduleConsole/schedule/index.jsp
4. selectTasks方法參數說明
taskParameter
:相應控制臺自己定義參數
,可自己定義傳入做邏輯上的操作
taskQueueNum
:相應控制臺任務項
數量
taskItemList
:集合中TaskItemDefine
的id值相應任務項
值,多線程處理時,依據任務項協調數據一致性和完整性
eachFetchDataNum
:相應控制臺每次獲取數量
,因為子計時單元開始後,會不斷的去取數據進行處理,直到取不到數據子計時才停止,等待下一個子計時開始。
能夠限制每次取數。防止一次性數據記錄過大,內存不足。
ownSign
:環境參數,一般沒什麽用
5. 創建調度策略參數說明
策略名稱
:策略標示,可隨意填寫
任務類型
:一般保持默認Schedule
任務名稱
:相應任務欄被調度任務名稱
任務參數
:一般不用,保持默認
單JVM最大線程組數量
:單個JVM同意開啟的線程數
最大線程組數量
:多處理機情況下的線程總數限制(總線程為2。任務項線程為4是沒有意義的)
IP地址
:127.0.0.1
或者localhost
會在全部機器上運行,註意多處理機若沒有依據任務子項劃分數據處理,會導致多處理機反復處理數據,慎重配置
創建演示樣例。官方wiki上有圖示。上面主要是各參數的具體含義。
6. 創建任務參數說明
任務名稱
:策略調度的標示。一旦創建保存,不可更改
任務處理的SpringBean
:註冊到spring的任務bean,如iScheduleTaskDealSingleTest
心跳頻率
/假定服務死亡時間
/處理模式
/沒有數據時休眠時長
/運行結束時間
:一般保持默認就可以
線程數
:處理該任務的線程數,在沒有劃分多任務項
的情況下,多線程是沒有意義的,且線程數量大於任務項
也是沒有意義的(線程數小於等於任務項)。註意假設開啟多線程,必須對數據做任務項過濾
單線程組最大任務項
:配置單JVM處理的最大任務項數量,多任務項
情況下。可按需限制,一般默認,多運行機會均衡分配
每次獲取數量
:子計時單元開始。線程會不斷的去獲取數據(每次獲取的限制)並處理數據,直到獲取不到數據子計時才結束(方法內不用就能夠隨意配置)
每次運行數量
://還沒測試過(可能是將獲取的數量拆分多次運行)
每次處理完休眠時間
:子計時單元開始,僅僅要有數據,就會不停的獲取不停的處理,這個時間設置後,子計時單元開始每次獲取運行後。無論還有沒有待數據,都先歇會兒再獲取處理
自己定義參數
:可自己定義控制任務邏輯操作
任務項
:這項非常重要。在多線程情況下,劃分任務項是有意義的,可是要註意必須通過任務項參數,協調待處理數據。否則多線程會反復處理
創建演示樣例,官方wiki上有圖示。上面主要是各參數的具體含義。
分布式開源調度框架TBSchedule原理與應用