1. 程式人生 > >TBSchedule原始碼學習筆記-任務處理

TBSchedule原始碼學習筆記-任務處理

上回說到每個執行緒組會建立自己的com.taobao.pamirs.schedule.taskmanager.TBScheduleManager例項來管理執行緒組,一個JVM中該例項的個數與結合排程機數目分配給JVM的數目一致。TBScheduleManager例項中會計算排程任務的啟動時機(與控制檯介面設定保持一致)。實際開發一個排程任務按框架要求需要實現com.taobao.pamirs.schedule.IScheduleTaskDealMulticom.taobao.pamirs.schedule.IScheduleTaskDealSingle介面。接著上邊的啟動過程來。
之前有說道:

這裡使用的“開啟服務”和“暫停服務”,分別使用了com.taobao.pamirs.schedule.taskmanager.TBScheduleManager類的public void resume(String message) throws Exception方法以及public void pause(String message) throws Exception方法

那麼就接著開啟看看public void resume(String message) throws Exception 方法,畢竟他是控制了一個執行緒組的啟動。每當到達排程時間(開始執行時間)都會呼叫該方法。


/**
     * 處在了可執行的時間區間,恢復執行
     * @throws Exception 
     */
    public void resume(String message) throws Exception{
        if (this.isPauseSchedule == true) {
            if(log.isDebugEnabled()){
                log.debug("恢復排程:" + this.currenScheduleServer.getUuid());
            }
            this
.isPauseSchedule = false; this.pauseMessage = message; if (this.taskDealBean != null) { if (this.taskTypeInfo.getProcessorType() != null && this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){ this.taskTypeInfo.setProcessorType("NOTSLEEP"); = new TBScheduleProcessorNotSleep(this, taskDealBean,this.statisticsInfo); }else{ this.processor = new TBScheduleProcessorSleep(this, taskDealBean,this.statisticsInfo); this.taskTypeInfo.setProcessorType("SLEEP"); } } rewriteScheduleInfo(); } }

大佬們說常用SLEEP模式,所以這裡主要看對SLEEP 模式的處理,可見這裡判斷任務設定如果為sleep模式,則建立一個com.taobao.pamirs.schedule.taskmanager.TBScheduleProcessorSleep<T>例項。這裡建立這個例項賦值給了TBScheduleManager的一個全域性變數,這個processor被拿去做了些什麼,難道是利用他啟動了一些執行緒?畢竟我這設定了好多的執行緒項,應該是在這裡selectTask並對資料分片的吧?所以先去看processor被拿去幹什麼用了。找了一圈發現以下幾個操作

/**
     * 當伺服器停止的時候,呼叫此方法清除所有未處理任務,清除伺服器的註冊資訊。
     * 也可能是控制中心發起的終止指令。
     * 需要注意的是,這個方法必須在當前任務處理完畢後才能執行
     * @throws Exception 
     */
    public void stop(String strategyName) throws Exception{
        //.........//
            this.processor.stopSchedule();
        //.........//
    }
/**
     * 清除記憶體中所有的已經取得的資料和任務佇列,在心態更新失敗,或者發現註冊中心的排程資訊被刪除
     */
    public void clearMemoInfo(){
        //.........//
                this.processor.clearAllHasFetchData();
        //.........//
    }
/**
     * 超過執行的執行時間,暫時停止排程
     * @throws Exception 
     */
    public void pause(String message) throws Exception{
        //.........//
                this.processor.stopSchedule();
        //.........//
    }

好吧好像都是一些要求釋放資源的操作,並沒有開啟所以到這裡有一種不詳的預感“是不是又在建構函式裡開執行緒了?!”,帶著疑問開啟TBScheduleProcessorSleep類的程式碼。嗯,看到了關鍵字 startThread 看來在這個類裡啟動執行緒無疑了。

/**
     * 建立一個排程處理器 
     * @param aManager
     * @param aTaskDealBean
     * @param aStatisticsInfo
     * @throws Exception
     */
    public TBScheduleProcessorSleep(TBScheduleManager aManager,
            IScheduleTaskDeal<T> aTaskDealBean, StatisticsInfo aStatisticsInfo) throws Exception {
        //執行緒組管理器
        this.scheduleManager = aManager;
        this.statisticsInfo = aStatisticsInfo;
        //任務配置資訊
        this.taskTypeInfo = this.scheduleManager.getTaskTypeInfo();
        //通過控制檯設定的bean
        this.taskDealBean = aTaskDealBean;
         //載入排程任務(實現IScheduleTaskDealMulti或IScheduleTaskDealSingle的區別)
        if (this.taskDealBean instanceof IScheduleTaskDealSingle<?>) {
            if (taskTypeInfo.getExecuteNumber() > 1) {
                taskTypeInfo.setExecuteNumber(1);
            }
            isMutilTask = false;
        } else {
            isMutilTask = true;
        }
        if (taskTypeInfo.getFetchDataNumber() < taskTypeInfo.getThreadNumber() * 10) {
            logger.warn("引數設定不合理,系統性能不佳。【每次從資料庫獲取的數量fetchnum】 >= 【執行緒數量threadnum】 *【最少迴圈次數10】 ");
        }
        //任務管理中"執行緒數" 配置,本例設定是5 ,所以會啟動5個執行緒。
        for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) {
            //
            this.startThread(i);
        }
    }

構造方法中確實啟動了n個執行緒,n為任務管理中設定的執行緒數,所以這裡利用執行緒做了些什麼呢?看一下startThread(int index)方法的原始碼。

private void startThread(int index) {
        Thread thread = new Thread(this);
        threadList.add(thread);
        String threadName = this.scheduleManager.getScheduleServer().getTaskType()+"-" 
                + this.scheduleManager.getCurrentSerialNumber() + "-exe"
                + index;
        thread.setName(threadName);
        thread.start();
    }

發現這裡的thread 傳入的引數是this,this當然是自己了(com.taobao.pamirs.schedule.taskmanager.TBScheduleProcessorSleep<T>),所以執行緒會執行這個類的run方法。
在看run的實現程式碼行之前,先看全域性變數,畢竟一個執行緒組會初始化一個TBScheduleProcessorSleep物件,假如設定了執行緒數為5,那麼會啟動5個執行緒,這5個執行緒之間必然會共享或同步一些資料,至少要共享selectTask所返回的資料供執行緒execute。所以看一下這個TBScheduleProcessorSleep類的全域性變數。

    /**
    * 哦?這個是什麼,mark
*/
    final  LockObject   m_lockObject = new LockObject();
    /**
    * 哦?這個又是什麼?mark
*/
    List<Thread> threadList =  new CopyOnWriteArrayList<Thread>();
    /**
     * 工作管理員
     */
    protected TBScheduleManager scheduleManager;
    /**
     * 任務型別
     */
    ScheduleTaskType taskTypeInfo;

    /**
     * 任務處理的介面類
     */
    protected IScheduleTaskDeal<T> taskDealBean;
    /**
     * 當前任務佇列的版本號
     */
    protected long taskListVersion = 0;
    final Object lockVersionObject = new Object();
    final Object lockRunningList = new Object();
    /**
    * 哦?這個又又是什麼?mark
    */
    protected List<T> taskList = new CopyOnWriteArrayList<T>();

    /**
     * 是否可以批處理
     */
    boolean isMutilTask = false;

    /**
     * 是否已經獲得終止排程訊號
     */
    boolean isStopSchedule = false;// 使用者停止佇列排程
    boolean isSleeping = false;
    /**
    * 一些統計資料
*/
    StatisticsInfo statisticsInfo;

除了一些公共配置資訊,目前存疑問的就是以下幾個,執行緒間共享這些資料的作用,還有如何同步的?selectTask所查出的資料如何避免叢集間重複讀取和使用?

final  LockObject   m_lockObject = new LockObject();
List<Thread> threadList =  new CopyOnWriteArrayList<Thread>();
protected List<T> taskList = new CopyOnWriteArrayList<T>();

注意java.util.concurrent.CopyOnWriteArrayList是執行緒安全的,CopyOnWrite併發容器用於讀多寫少的併發場景。既然是一個執行緒安全的併發容器,這樣就可以不用關心執行緒間threadList和taskList的資料一致性啦。
帶著疑問開啟run方法(吐槽原始碼格式有點亂)

 @SuppressWarnings({"rawtypes", "unchecked", "static-access"})
    public void run() {
        try {
            //開始執行時間
            long startTime = 0;
            while (true) {
                //這個方法有鎖,鎖在m_lockObject 例項上。
                this.m_lockObject.addThread();
                Object executeTask;
                while (true) {
                    //如果發起了停止的請求,釋放當前執行緒
                    if (this.isStopSchedule == true) {//停止佇列排程
                        this.m_lockObject.realseThread();
                        this.m_lockObject.notifyOtherThread();//通知所有的休眠執行緒
                        synchronized (this.threadList) {
                            this.threadList.remove(Thread.currentThread());
                            if (this.threadList.size() == 0) {
                                this.scheduleManager.unRegisterScheduleServer();
                            }
                        }
                        return;
                    }

                    //載入排程任務(實現IScheduleTaskDealMulti或IScheduleTaskDealSingle的區別)
                    if (this.isMutilTask == false) {
                        executeTask = this.getScheduleTaskId();
                    } else {
                        executeTask = this.getScheduleTaskIdMulti();
                    }
                    //如果沒有資料打破迴圈
                    if (executeTask == null) {
                        break;
                    }

                    try {//執行相關的程式
                        startTime = scheduleManager.scheduleCenter.getSystemTime();
                        if (this.isMutilTask == false) {
                            if (((IScheduleTaskDealSingle) this.taskDealBean).execute(executeTask, scheduleManager.getScheduleServer().getOwnSign()) == true) {
                                addSuccessNum(1, scheduleManager.scheduleCenter.getSystemTime()
                                                - startTime,
                                        "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
                            } else {
                                addFailNum(1, scheduleManager.scheduleCenter.getSystemTime()
                                                - startTime,
                                        "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
                            }
                        } else {
                            if (((IScheduleTaskDealMulti) this.taskDealBean)
                                    .execute((Object[]) executeTask, scheduleManager.getScheduleServer().getOwnSign()) == true) {
                                addSuccessNum(((Object[]) executeTask).length, scheduleManager.scheduleCenter.getSystemTime()
                                                - startTime,
                                        "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
                            } else {
                                addFailNum(((Object[]) executeTask).length, scheduleManager.scheduleCenter.getSystemTime()
                                                - startTime,
                                        "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
                            }
                        }
                    } catch (Throwable ex) {
                        if (this.isMutilTask == false) {
                            addFailNum(1, scheduleManager.scheduleCenter.getSystemTime() - startTime,
                                    "TBScheduleProcessor.run");
                        } else {
                            addFailNum(((Object[]) executeTask).length, scheduleManager.scheduleCenter.getSystemTime()
                                            - startTime,
                                    "TBScheduleProcessor.run");
                        }
                        logger.warn("Task :" + executeTask + " 處理失敗", ex);
                    }
                }
                //當前佇列中所有的任務都已經完成了。
                if (logger.isTraceEnabled()) {
                    logger.trace(Thread.currentThread().getName() + ":當前執行執行緒數量:" + this.m_lockObject.count());
                }
                if (this.m_lockObject.realseThreadButNotLast() == false) {
                    int size = 0;
                    Thread.currentThread().sleep(100);
                    startTime = scheduleManager.scheduleCenter.getSystemTime();
                    // 裝載資料
                    size = this.loadScheduleData();
                    if (size > 0) {
                        this.m_lockObject.notifyOtherThread();
                    } else {
                        //判斷當沒有資料的是否,是否需要退出排程
                        if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData() == true) {
                            if (logger.isTraceEnabled()) {
                                logger.trace("沒有裝載到資料,start sleep");
                            }
                            this.isSleeping = true;
                            Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData());
                            this.isSleeping = false;

                            if (logger.isTraceEnabled()) {
                                logger.trace("Sleep end");
                            }
                        } else {
                            //沒有資料,退出排程,喚醒所有沉睡執行緒
                            this.m_lockObject.notifyOtherThread();
                        }
                    }
                    this.m_lockObject.realseThread();
                } else {// 將當前執行緒放置到等待佇列中。直到有執行緒裝載到了新的任務資料
                    if (logger.isTraceEnabled()) {
                        logger.trace("不是最後一個執行緒,sleep");
                    }
                    this.m_lockObject.waitCurrentThread();
                }
            }
        } catch (Throwable e) {
            logger.error(e.getMessage(), e);
        }
    }

貌似這個方法還挺長…..,但是也不難發現是通過以下程式碼段查詢任務資料的。

public void run() {
    //......//
    if (this.isMutilTask == false) {
                        executeTask = this.getScheduleTaskId();
                    } else {
                        executeTask = this.getScheduleTaskIdMulti();
                    }
    }
    //......//
}

                    public synchronized Object getScheduleTaskId() {
                                 if (this.taskList.size() > 0)
                                     return this.taskList.remove(0);  // 按正序處理
                                 return null;
                               }

                               public synchronized Object[] getScheduleTaskIdMulti() {
                                   if (this.taskList.size() == 0){
                                     return null;
                                   }
                                   int size = taskList.size() > taskTypeInfo.getExecuteNumber() ? taskTypeInfo.getExecuteNumber()
                                            : taskList.size();

                                   Object[] result = null;
                                   if(size >0){
                                       result =(Object[])Array.newInstance(this.taskList.get(0).getClass(),size);
                                   }
                                   for(int i=0;i<size;i++){
                                     result[i] = this.taskList.remove(0);  // 按正序處理
                                   }
                                   return result;
                               }

發現取數的操作和taskList相關,所以this.taskList 即是通過selectTask 方法查詢出的任務資料。即該資料會存放在taskList屬性中由執行緒間共享。這裡明擺著只是操作移除taskList中的資料給execute方法使用,所以taskList方法中的資料從哪來?仔細尋找映入眼簾的關鍵程式碼在這。

public void run() {
     //......//
    while(true){
         //......//
        while(true){
             //......//

        }
        //......//
        size = this.loadScheduleData();
        //......//
    }
}
protected int loadScheduleData() {
        try {
           //在每次資料處理完畢後休眠固定的時間
            if (this.taskTypeInfo.getSleepTimeInterval() > 0) {
                if(logger.isTraceEnabled()){
                    logger.trace("處理完一批資料後休眠:" + this.taskTypeInfo.getSleepTimeInterval());
                }
                this.isSleeping = true;
                Thread.sleep(taskTypeInfo.getSleepTimeInterval());
                this.isSleeping = false;

                if(logger.isTraceEnabled()){
                    logger.trace("處理完一批資料後休眠後恢復");
                }
            }
            //哦?這個操作點開看滿複雜的。所以有必要詳細研究下。
            List<TaskItemDefine> taskItems = this.scheduleManager.getCurrentScheduleTaskItemList();
            // 根據佇列資訊查詢需要排程的資料,然後增加到任務列表中
            if (taskItems.size() > 0) {
                List<TaskItemDefine> tmpTaskList= new ArrayList<TaskItemDefine>();
                //拷貝一份資料
                synchronized(){
                    for (TaskItemDefine taskItemDefine : taskItems) {
                        tmpTaskList.add(taskItemDefine);
                    }
                }
                //呼叫
                List<T> tmpList = this.taskDealBean.selectTasks(
                        taskTypeInfo.getTaskParameter(),
                        scheduleManager.getScheduleServer().getOwnSign(),
                        this.scheduleManager.getTaskItemCount(), tmpTaskList,
                        taskTypeInfo.getFetchDataNumber());
                //更新上一次查詢資料的時間。
                scheduleManager.getScheduleServer().setLastFetchDataTime(new Timestamp(scheduleManager.scheduleCenter.getSystemTime()));
                if(tmpList != null){
                    //重要:將任務項配置放到執行緒組裡來。
                   this.taskList.addAll(tmpList);
                }
            } else {
                if(logger.isTraceEnabled()){
                       logger.trace("沒有獲取到需要處理的資料佇列");
                }
            }
            addFetchNum(taskList.size(),"TBScheduleProcessor.loadScheduleData");
            return this.taskList.size();
        } catch (Throwable ex) {
            logger.error("Get tasks error.", ex);
        }
        return 0;
    }

以上程式碼中已經可以看見資料來源確實是呼叫了任務實現的selectTask,並把資料重新整理到共享變數中。上面List<TaskItemDefine> taskItems = this.scheduleManager.getCurrentScheduleTaskItemList(); 這個疑問涉及到叢集間資料同步,一系列zk操作,日後整理。
從實際的使用經驗看,taskItemDefine物件應該是按任務項個數來分配的,而selectTask的時候會使用傳入的taskItemDefine物件作為查詢條件決定資料的查詢規模。當然如果不正確實現任務處理介面的化,所有的分片都白扯,例如直接無視taskItemDefine引數。

TaskItem任務項
是對任務進行的分片劃分。例如:
1、將一個數據表中所有資料的ID按10取模,就將資料劃分成了0、1、2、3、4、5、6、7、8、9供10個任務項。
2、將一個目錄下的所有檔案按檔名稱的首字母(不區分大小寫),
就劃分成了A、B、C、D、E、F、G、H、I、J、K、L、M、N、O、P、Q、R、S、T、U、V、W、X、Y、Z供26個佇列。
3、將一個數據表的資料ID雜湊後按1000取模作為最後的HASHCODE,我們就可以將資料按[0,100)、[100,200) 、[200,300)、[300,400) 、
[400,500)、[500,600)、[600,700)、[700,800)、[800,900)、 [900,1000)劃分為十個任務項,當然你也可以劃分為100個任務項,最多是1000個任務項。
任務項是進行任務分配的最小單位。一個任務項只能由一個ScheduleServer來進行處理。但一個Server可以處理任意數量的任務項。
例如任務被劃分為了10個佇列,可以只啟動一個Server,所有的任務項都有這一個Server來處理;也可以啟動兩個Server,每個Sever處理5個任務項;
但最多隻能啟動10個Server,每一個ScheduleServer只處理一個任務項。如果在多,則第11個及之後的Server將不起作用,處於休眠狀態。
4、可以為一個任務項自定義一個字串引數由應用自己解析。例如:”TYPE=A,KIND=1”

把loadScheduleData()和之前的executeTask = this.getScheduleTaskId(); 連起來,繼續回到run方法的兩層迴圈

public void run() {
     //......//
    while(true){
         this.m_lockObject.addThread();
         //......//
        while(true){
             //......//
            executeTask = this.getScheduleTaskId();

            //如果沒有資料打破迴圈
            if (executeTask == null) {
                break;
            }
            //否則呼叫execute方法消耗資料
        }
        //......//
        if (this.m_lockObject.realseThreadButNotLast() == false) {
            int size = 0;
            Thread.currentThread().sleep(100);
            startTime =scheduleManager.scheduleCenter.getSystemTime();
            // 裝載資料
            size = this.loadScheduleData();
            if (size > 0) {
                this.m_lockObject.notifyOtherThread();
            } else {
                //判斷當沒有資料的是否,是否需要退出排程
                if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData()== true ){                      
                    if(logger.isTraceEnabled()){
                           logger.trace("沒有裝載到資料,start sleep");
                    }
                    this.isSleeping = true;
                    Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData());
                    this.isSleeping = false;

                    if(logger.isTraceEnabled()){
                           logger.trace("Sleep end");
                    }
                }else{
                    //沒有資料,退出排程,喚醒所有沉睡執行緒
                    this.m_lockObject.notifyOtherThread();
                }
            }
            this.m_lockObject.realseThread();
        } else {// 將當前執行緒放置到等待佇列中。直到有執行緒裝載到了新的任務資料
            if(logger.isTraceEnabled()){
                   logger.trace("不是最後一個執行緒,sleep");
            }
            this.m_lockObject.waitCurrentThread();
        }

        //......//
    }
}

再把鎖的程式碼加回來,能發現對鎖的操作:
1.在開啟執行緒後每一次迴圈在LockObject物件中記錄一個執行緒數,並在最後釋放執行緒數,釋放後阻塞當前執行緒,直至釋放到最後一個執行緒,然後去呼叫selectTask裡面載入資料。
2.如果查詢到資料就喚醒所有等待的執行緒。否則判斷是否需要暫停排程,如果不需要退出排程會繼續喚醒執行緒。各執行緒然後會繼續執行1的操作。

這樣就能保證在一個執行緒組裡只有一個執行緒可以做selectTask操作。而根據程式碼上看taskItemDefine貌似是在叢集中隨機分配的,這樣就能保證正確實現任務處理介面的情況下,針對每個任務資料在叢集中是平均分配的(因為selectTask方法使用taskItemDefine作為查詢引數,對資料進行分片)