1. 程式人生 > >Schedulerx2.0分散式計算原理&最佳實踐

Schedulerx2.0分散式計算原理&最佳實踐

1. 前言

Schedulerx2.0的客戶端提供分散式執行、多種任務型別、統一日誌等框架,使用者只要依賴schedulerx-worker這個jar包,通過schedulerx2.0提供的程式設計模型,簡單幾行程式碼就能實現一套高可靠可運維的分散式執行引擎。

這篇文章重點是介紹基於schedulerx2.0的分散式執行引擎原理和最佳實踐,相信看完這篇文章,大家都能寫出高效率的分散式作業,說不定速度能提升好幾倍:)

2. 可擴充套件的執行引擎

Worker總體架構參考Yarn的架構,分為TaskMaster, Container, Processor三層:

  • TaskMaster:類似於yarn的AppMaster,支援可擴充套件的分散式執行框架,進行整個jobInstance的生命週期管理、container的資源管理,同時還有failover等能力。預設實現StandaloneTaskMaster(單機執行),BroadcastTaskMaster(廣播執行),MapTaskMaster(平行計算、記憶體網格、網格計算),MapReduceTaskMaster(平行計算、記憶體網格、網格計算)。
  • Container:執行業務邏輯的容器框架,支援執行緒/程序/docker/actor等。
  • Processor:業務邏輯框架,不同的processor表示不同的任務型別。

以MapTaskMaster為例,大概的原理如下圖所示:

3. 分散式程式設計模型之Map模型

Schedulerx2.0提供了多種分散式程式設計模型,這篇文章主要介紹Map模型(之後的文章還會介紹MapReduce模型,適用更多的業務場景),簡單幾行程式碼就可以將海量資料分散式到多臺機器上進行分散式跑批,非常簡單易用。

針對不同的跑批場景,map模型作業還提供了平行計算、記憶體網格、網格計算三種執行方式:

  • 平行計算:子任務300以下,有子任務列表。
  • 記憶體網格:子任務5W以下,無子任務列表,速度快。
  • 網格計算:子任務100W以下,無子任務列表。

4. 平行計算原理

因為並行任務具有子任務列表:


如上圖,子任務列表可以看到每個子任務的狀態、機器,還有重跑、檢視日誌等操作。

因為平行計算要做到子任務級別的視覺化,並且worker掛了、重啟還能支援手動重跑,就需要把task持久化到server端:


如上圖所示:

  1. server觸發jobInstance到某個worker,選中為master。
  2. MapTaskMaster選擇某個worker執行root任務,當執行map方法時,會回撥MapTaskMaster。
  3. MapTaskMaster收到map方法,會把task持久化到server端。
  4. 同時,MapTaskMaster還有個pull執行緒,不停拉取INIT狀態的task,並派發給其他worker執行。

5. 網格計算原理

網格計算要支援百萬級別的task,如果所有任務都往server回寫,server肯定扛不住,所以網格計算的儲存實際上是分散式在使用者自己的機器上的:

如上圖所示:

  1. server觸發jobInstance到某個worker,選中為master。
  2. MapTaskMaster選擇某個worker執行root任務,當執行map方法時,會回撥MapTaskMaster。
  3. MapTaskMaster收到map方法,會把task持久化到本地h2資料庫。
  4. 同時,MapTaskMaster還有個pull執行緒,不停拉取INIT狀態的task,並派發給其他worker執行。

6. 最佳實踐

6.1 需求

舉個例子:

  1. 讀取A表中status=0的資料。
  2. 處理這些資料,插入B表。
  3. 把A表中處理過的資料的修改status=1。
  4. 資料量有4億+,希望縮短時間。

6.2 反面案例

我們先看下如下程式碼是否有問題?

public class ScanSingleTableProcessor extends MapJobProcessor {
    private static int pageSize = 1000;

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();

        if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {
            int recordCount = queryRecordCount();
            int pageAmount = recordCount / pageSize;//計算分頁數量
            for(int i = 0 ; i < pageAmount ; i ++) {
                List<Record> recordList = queryRecord(i);//根據分頁查詢一頁資料
                map(recordList, "record記錄");//把子任務分發出去並行處理
            }
            return new ProcessResult(true);//true表示執行成功,false表示失敗
        } else if ("record記錄".equals(taskName)) {
            //TODO
            return new ProcessResult(true);
        }
        return new ProcessResult(false);
    }
}

如上面的程式碼所示,在root任務中,會把資料庫所有記錄讀取出來,每一行就是一個Record,然後分發出去,分散式到不同的worker上去執行。邏輯是沒有問題的,但是實際上效能非常的差。結合網格計算原理,我們把上面的程式碼繪製成下面這幅圖:

如上圖所示,root任務一開始會全量的讀取A表的資料,然後會全量的存到h2中,pull執行緒還會全量的從h2讀取一次所有的task,還會分發給所有客戶端。所以實際上對A表中的資料:

  • 全量讀2次
  • 全量寫一次
  • 全量傳輸一次

這個效率是非常低的。

6.3 正面案例

下面給出正面案例的程式碼:

public class ScanSingleTableJobProcessor extends MapJobProcessor {
    private static final int pageSize = 100;

    static class PageTask {
        private int startId;
        private int endId;
        public PageTask(int startId, int endId) {
             this.startId = startId;
             this.endId = endId;
        }
        public int getStartId() {
              return startId;
        }
        public int getEndId() {
              return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {
            System.out.println("start root task");
            Pair<Integer, Integer> idPair = queryMinAndMaxId();
            int minId = idPair.getFirst();
            int maxId = idPair.getSecond();
            List<PageTask> taskList = Lists.newArrayList();
            int step = (int) ((maxId - minId) / pageSize); //計算分頁數量
            for (int i = minId; i < maxId; i+=step) {
                taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
            }
            return map(taskList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            PageTask record = (PageTask)task;
            long startId = record.getStartId();
            long endId = record.getEndId();
            //TODO
            return new ProcessResult(true);
        }
        return new ProcessResult(true);
    }

    @Override
    public void postProcess(JobContext context) {
        //TODO
        System.out.println("all tasks is finished.");
    }

    private Pair<Integer, Integer> queryMinAndMaxId() {
        //TODO select min(id),max(id) from xxx
        return null;
    }
}

如上面的程式碼所示,

  • 每個task不是整行記錄的record,而是PageTask,裡面就2個欄位,startId和endId。
  • root任務,沒有全量的讀取A表,而是讀一下整張表的minId和maxId,然後構造PageTask進行分頁。比如task1表示PageTask[1,1000],task2表示PageTask[1001,2000]。每個task處理A表不同的資料。
  • 在下一級task中,如果拿到的是PageTask,再根據id區間去A表處理資料。

根據上面的程式碼和網格計算原理,得出下面這幅圖:

如上圖所示,

  • A表只需要全量讀取一次。
  • 子任務數量比反面案例少了上千、上萬倍。
  • 子任務的body非常小,如果recod中有大欄位,也少了上千、上萬倍。

綜上,對A表訪問次數少了好幾倍,對h2儲存壓力少了上萬倍,不但執行速度可以快很多,還保證不會把自己本地的h2資料庫搞掛。


原文連結
本文為雲棲社群原創內容,未經