TBSchedule原始碼學習筆記-任務處理
上回說到每個執行緒組會建立自己的com.taobao.pamirs.schedule.taskmanager.TBScheduleManager
例項來管理執行緒組,一個JVM中該例項的個數與結合排程機數目分配給JVM的數目一致。TBScheduleManager例項中會計算排程任務的啟動時機(與控制檯介面設定保持一致)。實際開發一個排程任務按框架要求需要實現com.taobao.pamirs.schedule.IScheduleTaskDealMulti
或com.taobao.pamirs.schedule.IScheduleTaskDealSingle
介面。接著上邊的啟動過程來。
之前有說道:
這裡使用的“開啟服務”和“暫停服務”,分別使用了com.taobao.pamirs.schedule.taskmanager.TBScheduleManager類的public void resume(String message) throws Exception方法以及public void pause(String message) throws Exception方法
那麼就接著開啟看看public void resume(String message) throws Exception 方法,畢竟他是控制了一個執行緒組的啟動。每當到達排程時間(開始執行時間)都會呼叫該方法。
/**
* 處在了可執行的時間區間,恢復執行
* @throws Exception
*/
public void resume(String message) throws Exception{
if (this.isPauseSchedule == true) {
if(log.isDebugEnabled()){
log.debug("恢復排程:" + this.currenScheduleServer.getUuid());
}
this .isPauseSchedule = false;
this.pauseMessage = message;
if (this.taskDealBean != null) {
if (this.taskTypeInfo.getProcessorType() != null &&
this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){
this.taskTypeInfo.setProcessorType("NOTSLEEP");
= new TBScheduleProcessorNotSleep(this,
taskDealBean,this.statisticsInfo);
}else{
this.processor = new TBScheduleProcessorSleep(this,
taskDealBean,this.statisticsInfo);
this.taskTypeInfo.setProcessorType("SLEEP");
}
}
rewriteScheduleInfo();
}
}
大佬們說常用SLEEP模式,所以這裡主要看對SLEEP 模式的處理,可見這裡判斷任務設定如果為sleep模式,則建立一個com.taobao.pamirs.schedule.taskmanager.TBScheduleProcessorSleep<T>
例項。這裡建立這個例項賦值給了TBScheduleManager的一個全域性變數,這個processor被拿去做了些什麼,難道是利用他啟動了一些執行緒?畢竟我這設定了好多的執行緒項,應該是在這裡selectTask並對資料分片的吧?所以先去看processor被拿去幹什麼用了。找了一圈發現以下幾個操作
/**
* 當伺服器停止的時候,呼叫此方法清除所有未處理任務,清除伺服器的註冊資訊。
* 也可能是控制中心發起的終止指令。
* 需要注意的是,這個方法必須在當前任務處理完畢後才能執行
* @throws Exception
*/
public void stop(String strategyName) throws Exception{
//.........//
this.processor.stopSchedule();
//.........//
}
/**
* 清除記憶體中所有的已經取得的資料和任務佇列,在心態更新失敗,或者發現註冊中心的排程資訊被刪除
*/
public void clearMemoInfo(){
//.........//
this.processor.clearAllHasFetchData();
//.........//
}
/**
* 超過執行的執行時間,暫時停止排程
* @throws Exception
*/
public void pause(String message) throws Exception{
//.........//
this.processor.stopSchedule();
//.........//
}
好吧好像都是一些要求釋放資源的操作,並沒有開啟所以到這裡有一種不詳的預感“是不是又在建構函式裡開執行緒了?!”,帶著疑問開啟TBScheduleProcessorSleep類的程式碼。嗯,看到了關鍵字 startThread 看來在這個類裡啟動執行緒無疑了。
/**
* 建立一個排程處理器
* @param aManager
* @param aTaskDealBean
* @param aStatisticsInfo
* @throws Exception
*/
public TBScheduleProcessorSleep(TBScheduleManager aManager,
IScheduleTaskDeal<T> aTaskDealBean, StatisticsInfo aStatisticsInfo) throws Exception {
//執行緒組管理器
this.scheduleManager = aManager;
this.statisticsInfo = aStatisticsInfo;
//任務配置資訊
this.taskTypeInfo = this.scheduleManager.getTaskTypeInfo();
//通過控制檯設定的bean
this.taskDealBean = aTaskDealBean;
//載入排程任務(實現IScheduleTaskDealMulti或IScheduleTaskDealSingle的區別)
if (this.taskDealBean instanceof IScheduleTaskDealSingle<?>) {
if (taskTypeInfo.getExecuteNumber() > 1) {
taskTypeInfo.setExecuteNumber(1);
}
isMutilTask = false;
} else {
isMutilTask = true;
}
if (taskTypeInfo.getFetchDataNumber() < taskTypeInfo.getThreadNumber() * 10) {
logger.warn("引數設定不合理,系統性能不佳。【每次從資料庫獲取的數量fetchnum】 >= 【執行緒數量threadnum】 *【最少迴圈次數10】 ");
}
//任務管理中"執行緒數" 配置,本例設定是5 ,所以會啟動5個執行緒。
for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) {
//
this.startThread(i);
}
}
構造方法中確實啟動了n個執行緒,n為任務管理中設定的執行緒數,所以這裡利用執行緒做了些什麼呢?看一下startThread(int index)方法的原始碼。
private void startThread(int index) {
Thread thread = new Thread(this);
threadList.add(thread);
String threadName = this.scheduleManager.getScheduleServer().getTaskType()+"-"
+ this.scheduleManager.getCurrentSerialNumber() + "-exe"
+ index;
thread.setName(threadName);
thread.start();
}
發現這裡的thread 傳入的引數是this,this當然是自己了(com.taobao.pamirs.schedule.taskmanager.TBScheduleProcessorSleep<T>
),所以執行緒會執行這個類的run方法。
在看run的實現程式碼行之前,先看全域性變數,畢竟一個執行緒組會初始化一個TBScheduleProcessorSleep物件,假如設定了執行緒數為5,那麼會啟動5個執行緒,這5個執行緒之間必然會共享或同步一些資料,至少要共享selectTask所返回的資料供執行緒execute。所以看一下這個TBScheduleProcessorSleep類的全域性變數。
/**
* 哦?這個是什麼,mark
*/
final LockObject m_lockObject = new LockObject();
/**
* 哦?這個又是什麼?mark
*/
List<Thread> threadList = new CopyOnWriteArrayList<Thread>();
/**
* 工作管理員
*/
protected TBScheduleManager scheduleManager;
/**
* 任務型別
*/
ScheduleTaskType taskTypeInfo;
/**
* 任務處理的介面類
*/
protected IScheduleTaskDeal<T> taskDealBean;
/**
* 當前任務佇列的版本號
*/
protected long taskListVersion = 0;
final Object lockVersionObject = new Object();
final Object lockRunningList = new Object();
/**
* 哦?這個又又是什麼?mark
*/
protected List<T> taskList = new CopyOnWriteArrayList<T>();
/**
* 是否可以批處理
*/
boolean isMutilTask = false;
/**
* 是否已經獲得終止排程訊號
*/
boolean isStopSchedule = false;// 使用者停止佇列排程
boolean isSleeping = false;
/**
* 一些統計資料
*/
StatisticsInfo statisticsInfo;
除了一些公共配置資訊,目前存疑問的就是以下幾個,執行緒間共享這些資料的作用,還有如何同步的?selectTask所查出的資料如何避免叢集間重複讀取和使用?
final LockObject m_lockObject = new LockObject();
List<Thread> threadList = new CopyOnWriteArrayList<Thread>();
protected List<T> taskList = new CopyOnWriteArrayList<T>();
注意java.util.concurrent.CopyOnWriteArrayList是執行緒安全的,CopyOnWrite併發容器用於讀多寫少的併發場景。既然是一個執行緒安全的併發容器,這樣就可以不用關心執行緒間threadList和taskList的資料一致性啦。
帶著疑問開啟run方法(吐槽原始碼格式有點亂)
@SuppressWarnings({"rawtypes", "unchecked", "static-access"})
public void run() {
try {
//開始執行時間
long startTime = 0;
while (true) {
//這個方法有鎖,鎖在m_lockObject 例項上。
this.m_lockObject.addThread();
Object executeTask;
while (true) {
//如果發起了停止的請求,釋放當前執行緒
if (this.isStopSchedule == true) {//停止佇列排程
this.m_lockObject.realseThread();
this.m_lockObject.notifyOtherThread();//通知所有的休眠執行緒
synchronized (this.threadList) {
this.threadList.remove(Thread.currentThread());
if (this.threadList.size() == 0) {
this.scheduleManager.unRegisterScheduleServer();
}
}
return;
}
//載入排程任務(實現IScheduleTaskDealMulti或IScheduleTaskDealSingle的區別)
if (this.isMutilTask == false) {
executeTask = this.getScheduleTaskId();
} else {
executeTask = this.getScheduleTaskIdMulti();
}
//如果沒有資料打破迴圈
if (executeTask == null) {
break;
}
try {//執行相關的程式
startTime = scheduleManager.scheduleCenter.getSystemTime();
if (this.isMutilTask == false) {
if (((IScheduleTaskDealSingle) this.taskDealBean).execute(executeTask, scheduleManager.getScheduleServer().getOwnSign()) == true) {
addSuccessNum(1, scheduleManager.scheduleCenter.getSystemTime()
- startTime,
"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
} else {
addFailNum(1, scheduleManager.scheduleCenter.getSystemTime()
- startTime,
"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
}
} else {
if (((IScheduleTaskDealMulti) this.taskDealBean)
.execute((Object[]) executeTask, scheduleManager.getScheduleServer().getOwnSign()) == true) {
addSuccessNum(((Object[]) executeTask).length, scheduleManager.scheduleCenter.getSystemTime()
- startTime,
"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
} else {
addFailNum(((Object[]) executeTask).length, scheduleManager.scheduleCenter.getSystemTime()
- startTime,
"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
}
}
} catch (Throwable ex) {
if (this.isMutilTask == false) {
addFailNum(1, scheduleManager.scheduleCenter.getSystemTime() - startTime,
"TBScheduleProcessor.run");
} else {
addFailNum(((Object[]) executeTask).length, scheduleManager.scheduleCenter.getSystemTime()
- startTime,
"TBScheduleProcessor.run");
}
logger.warn("Task :" + executeTask + " 處理失敗", ex);
}
}
//當前佇列中所有的任務都已經完成了。
if (logger.isTraceEnabled()) {
logger.trace(Thread.currentThread().getName() + ":當前執行執行緒數量:" + this.m_lockObject.count());
}
if (this.m_lockObject.realseThreadButNotLast() == false) {
int size = 0;
Thread.currentThread().sleep(100);
startTime = scheduleManager.scheduleCenter.getSystemTime();
// 裝載資料
size = this.loadScheduleData();
if (size > 0) {
this.m_lockObject.notifyOtherThread();
} else {
//判斷當沒有資料的是否,是否需要退出排程
if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData() == true) {
if (logger.isTraceEnabled()) {
logger.trace("沒有裝載到資料,start sleep");
}
this.isSleeping = true;
Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData());
this.isSleeping = false;
if (logger.isTraceEnabled()) {
logger.trace("Sleep end");
}
} else {
//沒有資料,退出排程,喚醒所有沉睡執行緒
this.m_lockObject.notifyOtherThread();
}
}
this.m_lockObject.realseThread();
} else {// 將當前執行緒放置到等待佇列中。直到有執行緒裝載到了新的任務資料
if (logger.isTraceEnabled()) {
logger.trace("不是最後一個執行緒,sleep");
}
this.m_lockObject.waitCurrentThread();
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
貌似這個方法還挺長…..,但是也不難發現是通過以下程式碼段查詢任務資料的。
public void run() {
//......//
if (this.isMutilTask == false) {
executeTask = this.getScheduleTaskId();
} else {
executeTask = this.getScheduleTaskIdMulti();
}
}
//......//
}
public synchronized Object getScheduleTaskId() {
if (this.taskList.size() > 0)
return this.taskList.remove(0); // 按正序處理
return null;
}
public synchronized Object[] getScheduleTaskIdMulti() {
if (this.taskList.size() == 0){
return null;
}
int size = taskList.size() > taskTypeInfo.getExecuteNumber() ? taskTypeInfo.getExecuteNumber()
: taskList.size();
Object[] result = null;
if(size >0){
result =(Object[])Array.newInstance(this.taskList.get(0).getClass(),size);
}
for(int i=0;i<size;i++){
result[i] = this.taskList.remove(0); // 按正序處理
}
return result;
}
發現取數的操作和taskList相關,所以this.taskList 即是通過selectTask 方法查詢出的任務資料。即該資料會存放在taskList屬性中由執行緒間共享。這裡明擺著只是操作移除taskList中的資料給execute方法使用,所以taskList方法中的資料從哪來?仔細尋找映入眼簾的關鍵程式碼在這。
public void run() {
//......//
while(true){
//......//
while(true){
//......//
}
//......//
size = this.loadScheduleData();
//......//
}
}
protected int loadScheduleData() {
try {
//在每次資料處理完畢後休眠固定的時間
if (this.taskTypeInfo.getSleepTimeInterval() > 0) {
if(logger.isTraceEnabled()){
logger.trace("處理完一批資料後休眠:" + this.taskTypeInfo.getSleepTimeInterval());
}
this.isSleeping = true;
Thread.sleep(taskTypeInfo.getSleepTimeInterval());
this.isSleeping = false;
if(logger.isTraceEnabled()){
logger.trace("處理完一批資料後休眠後恢復");
}
}
//哦?這個操作點開看滿複雜的。所以有必要詳細研究下。
List<TaskItemDefine> taskItems = this.scheduleManager.getCurrentScheduleTaskItemList();
// 根據佇列資訊查詢需要排程的資料,然後增加到任務列表中
if (taskItems.size() > 0) {
List<TaskItemDefine> tmpTaskList= new ArrayList<TaskItemDefine>();
//拷貝一份資料
synchronized(){
for (TaskItemDefine taskItemDefine : taskItems) {
tmpTaskList.add(taskItemDefine);
}
}
//呼叫
List<T> tmpList = this.taskDealBean.selectTasks(
taskTypeInfo.getTaskParameter(),
scheduleManager.getScheduleServer().getOwnSign(),
this.scheduleManager.getTaskItemCount(), tmpTaskList,
taskTypeInfo.getFetchDataNumber());
//更新上一次查詢資料的時間。
scheduleManager.getScheduleServer().setLastFetchDataTime(new Timestamp(scheduleManager.scheduleCenter.getSystemTime()));
if(tmpList != null){
//重要:將任務項配置放到執行緒組裡來。
this.taskList.addAll(tmpList);
}
} else {
if(logger.isTraceEnabled()){
logger.trace("沒有獲取到需要處理的資料佇列");
}
}
addFetchNum(taskList.size(),"TBScheduleProcessor.loadScheduleData");
return this.taskList.size();
} catch (Throwable ex) {
logger.error("Get tasks error.", ex);
}
return 0;
}
以上程式碼中已經可以看見資料來源確實是呼叫了任務實現的selectTask,並把資料重新整理到共享變數中。上面List<TaskItemDefine> taskItems = this.scheduleManager.getCurrentScheduleTaskItemList();
這個疑問涉及到叢集間資料同步,一系列zk操作,日後整理。
從實際的使用經驗看,taskItemDefine物件應該是按任務項個數來分配的,而selectTask的時候會使用傳入的taskItemDefine物件作為查詢條件決定資料的查詢規模。當然如果不正確實現任務處理介面的化,所有的分片都白扯,例如直接無視taskItemDefine引數。
TaskItem任務項
是對任務進行的分片劃分。例如:
1、將一個數據表中所有資料的ID按10取模,就將資料劃分成了0、1、2、3、4、5、6、7、8、9供10個任務項。
2、將一個目錄下的所有檔案按檔名稱的首字母(不區分大小寫),
就劃分成了A、B、C、D、E、F、G、H、I、J、K、L、M、N、O、P、Q、R、S、T、U、V、W、X、Y、Z供26個佇列。
3、將一個數據表的資料ID雜湊後按1000取模作為最後的HASHCODE,我們就可以將資料按[0,100)、[100,200) 、[200,300)、[300,400) 、
[400,500)、[500,600)、[600,700)、[700,800)、[800,900)、 [900,1000)劃分為十個任務項,當然你也可以劃分為100個任務項,最多是1000個任務項。
任務項是進行任務分配的最小單位。一個任務項只能由一個ScheduleServer來進行處理。但一個Server可以處理任意數量的任務項。
例如任務被劃分為了10個佇列,可以只啟動一個Server,所有的任務項都有這一個Server來處理;也可以啟動兩個Server,每個Sever處理5個任務項;
但最多隻能啟動10個Server,每一個ScheduleServer只處理一個任務項。如果在多,則第11個及之後的Server將不起作用,處於休眠狀態。
4、可以為一個任務項自定義一個字串引數由應用自己解析。例如:”TYPE=A,KIND=1”
把loadScheduleData()和之前的executeTask = this.getScheduleTaskId(); 連起來,繼續回到run方法的兩層迴圈
public void run() {
//......//
while(true){
this.m_lockObject.addThread();
//......//
while(true){
//......//
executeTask = this.getScheduleTaskId();
//如果沒有資料打破迴圈
if (executeTask == null) {
break;
}
//否則呼叫execute方法消耗資料
}
//......//
if (this.m_lockObject.realseThreadButNotLast() == false) {
int size = 0;
Thread.currentThread().sleep(100);
startTime =scheduleManager.scheduleCenter.getSystemTime();
// 裝載資料
size = this.loadScheduleData();
if (size > 0) {
this.m_lockObject.notifyOtherThread();
} else {
//判斷當沒有資料的是否,是否需要退出排程
if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData()== true ){
if(logger.isTraceEnabled()){
logger.trace("沒有裝載到資料,start sleep");
}
this.isSleeping = true;
Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData());
this.isSleeping = false;
if(logger.isTraceEnabled()){
logger.trace("Sleep end");
}
}else{
//沒有資料,退出排程,喚醒所有沉睡執行緒
this.m_lockObject.notifyOtherThread();
}
}
this.m_lockObject.realseThread();
} else {// 將當前執行緒放置到等待佇列中。直到有執行緒裝載到了新的任務資料
if(logger.isTraceEnabled()){
logger.trace("不是最後一個執行緒,sleep");
}
this.m_lockObject.waitCurrentThread();
}
//......//
}
}
再把鎖的程式碼加回來,能發現對鎖的操作:
1.在開啟執行緒後每一次迴圈在LockObject物件中記錄一個執行緒數,並在最後釋放執行緒數,釋放後阻塞當前執行緒,直至釋放到最後一個執行緒,然後去呼叫selectTask裡面載入資料。
2.如果查詢到資料就喚醒所有等待的執行緒。否則判斷是否需要暫停排程,如果不需要退出排程會繼續喚醒執行緒。各執行緒然後會繼續執行1的操作。
這樣就能保證在一個執行緒組裡只有一個執行緒可以做selectTask操作。而根據程式碼上看taskItemDefine貌似是在叢集中隨機分配的,這樣就能保證正確實現任務處理介面的情況下,針對每個任務資料在叢集中是平均分配的(因為selectTask方法使用taskItemDefine作為查詢引數,對資料進行分片)