流程控制:分布式並行任務流程控制
- 背景:
目前工作中遇到一個比較急,又有點費事的工作任務:
1)目前系統中已經已經包含了一些比較完善的部分模塊,但是模塊之間沒有一個控制流程來管理,就造成程序沒有辦法自動化;
2)已經完成模塊中有幾個是采用分布式部署,但各個服務器之間又是采用的並行執行不同的任務(目的最大化利用服務器,節省處理總耗時):這寫對流程化控制帶來了一些控制繁瑣問題。
3)目前並不需要考慮太多穩定行的問題,但是流程控制程序必須考慮到高可用性(就是需要部署為HA)。
- 目前已經擁有的功能模塊:
1)采集及更新工參、KPI、路測、掃頻等到數據庫:這些數據在ftp上,因此,每次更新需要從ftp上自動采集數據(當然是任務觸發時最新的數據)之後更新到數據庫中;
2)采集mr數據及解析mr,這裏分為了三個功能模塊:
2.1)采集mr功能:目前已經包含,但是功能還是不太完善,每次采集的觸發時間應該是當任務觸發時開啟采集(這樣避免了未領取到任務時盲目的采集,造成服務器性能下降其他分析慢,IO存儲不足問題),采集距離“采集任務”觸發時間最近且完整的數據;
2.2)mr大壓縮包解壓任務:目前系統中包含的mr數據文件格式比較復雜,可能包含大壓縮包套文件,或者達壓縮吧套小壓縮包的問題,最終需要加壓為最小壓縮包(也就是在解壓一次就是.xml文件);
2.3)mr解析:mr解析是一個分布式部署(部署在多臺服務器,按照公式:enb%服務器數量,得到的值來分配enb到具體哪臺服務器),但是每臺服務器與每臺服務器之間是沒有聯系的:每臺服務器只負責並行的處理分配到自己節點上的任務。
3)mr柵格化:由於上邊mr解析後分別存儲到自己節點服務器上的數據,因此這裏的mr柵格化數據也是分布式的部署在每臺節點服務器上。
- 解決方案思路:
為此,我寫了一個代碼邏輯框架:
文件Program.cs是代碼核心業務控制:
1)它支持部署HA:可以部署多臺服務器,和一臺服務器一樣工作。
2)核心業務控制思路:
1 static void Main(string[] args) 2 { 3 while (true) 4 { 5 // TaskLock在zookeeper或sqlserver數據中只存在唯一的一條記錄。6 TaskLock taskLock = TaskLockBO.Get(); 7 8 if (taskLock.Lock == LockStatus.Locked) 9 { 10 // 獲取正在執行的任務。。。。 11 List<Task> taskItems = TaskBO.GetToDoTaskByTaskGroup(taskLock.DoingTaskGroup); 12 13 if (taskItems == null || taskItems.Count == 0) 14 { 15 // 修改taskLock.Lock=UnLock、taskLock.DoingTaskGroup=Guid.Empty 16 } 17 else 18 { 19 // 按照任務的優先級執行task 20 // 開始調度每臺計算服務器上的任務。。。 21 // 1)如果第一個待執行(todo)的任務是“工參導入或更新”,修改其任務狀態為doing。 22 // “工參導入或更新”服務只可能部署在一臺服務器上或者就是這裏實現,當獲取到歸屬自己的任務狀態是doing時,就開始從ftp上采集工參數據,並解析導入,完成修改任務狀態為done,失敗修改任務狀態為fail 23 // 2)如果第一個待執行(todo)的任務是“采集MR”,修改其任務狀態為doing。 24 // “采集MR”服務也是只可能部署在一臺服務器上(但不可能在這裏執行),當獲取到歸屬自己的任務狀態是doing時,就開始監控ftp,並采集ftp數據到本地,完成修改任務狀態為done,失敗修改任務狀態為fail 25 // 3)如果第一個待執行(todo)的任務是“解壓超大壓縮包”,修改其任務狀態為doing. 26 // “解壓超大壓縮包”服務也是只可能部署在一臺服務器上(但不可能在這裏執行),當獲取到歸屬自己的任務狀態是doing時,就開始循環遍歷采集的mr,若找到超大壓縮包就進行超大壓縮包解壓,完成修改任務狀態為done,失敗修改任務狀態為fail 27 // 4)如果第一個待執行(todo)的任務是“解析MR”,修改任務狀態為predoing,循環遍歷ftp目錄的數據按照分發規則把mr問價分發到不同的計算節點服務器指定的位置,並創建“解析mr子任務”給每臺解析mr服務器,並修改該任務狀態為doing 28 // 註意:這裏是分布式處理的,因此給所有子節點分配任務後統一修改所有“解析MR”任務狀態為doing(每個compute包含一個“mr解析”任務). 29 // “解析MR”服務部署在多個解析處理服務器上,當獲取到歸屬自己的節點的“解析mr”任務狀態是doing時,就開始獲取自己的節點下的“解析mr子任務”逐個處理,處理完成後修改歸屬自己的節點“mr解析”任務狀態為done,失敗修改任務狀態為fail 30 // 註意:這裏是分布式處理的,因此需要考慮到等待所有節點“mr解析”都完成後,才可以進行下一步。 31 // 5)如果第一個待執行(todo)的任務是“mr柵格化”,修改任務狀態為doing. 32 // 註意:這裏是分布式處理的,因此給所有子節點分配任務後統一修改所有“MR柵格化”任務狀態為doing(每個compute包含一個“MR柵格化”任務). 33 // “MR柵格化”服務部署在多個處理服務器上,當獲取到歸屬自己的節點的“mr柵格化”任務狀態是doing時,開始逐個處理自己節點上的“mr柵格化”任務,處理完成後修改歸屬自己的節點“mr柵格化”任務狀態為done,失敗修改任務狀態為fail 34 // 註意:這裏是分布式處理的,因此需要考慮到等待所有節點“mr柵格化”都完成後,才可以進行下一步。 35 } 36 } 37 else if (taskLock.Lock == LockStatus.UnLock) 38 { 39 // 嘗試獲取新的任務。 40 // 修改taskLock.lock=PreLock 41 42 43 // 添加任務成功,則修改taskLock.Lock=Locked、taskLock.DoingTaskGroup賦值;添加任務失敗,則修改taskLock.Lock=UnLock 44 } 45 46 // 5 分鐘輪詢一次。 47 Thread.Sleep(5 * 60 * 1000); 48 } 49 }
3)任務狀態&類型&定義包含:
enum TaskType { /// <summary> /// 導入或更新工參、KPI等數據 /// </summary> ImportSiteCellKpi = 0, /// <summary> /// 采集MR數據 /// </summary> GatherMR = 1, /// <summary> /// 嘗試解壓包含多層壓縮包的MR數據 /// </summary> DoUnZipMR = 2, /// <summary> /// 解析入庫MR數據 /// </summary> DoParserMR = 3, /// <summary> /// MR柵格化 /// </summary> DoMRRaster = 4, } enum TaskStatus { Todo = 0, PreDoing = 1, Doing = 2, Done = 3, Fail = 4 } class Task { /// <summary> /// 如果為同一批次批量處理流程,則TaskGroup為同一個Guid值。 /// </summary> public Guid TaskGroup { get; set; } public int TaskId { get; set; } public TaskType TaskType { get; set; } public TaskStatus TaskStatus { get; set; } /// <summary> /// 任務優先級 /// </summary> public int Priority { get; set; } public string ComputeIP { get; set; } public DateTime CreateTime { get; set; } public DateTime DoingTime { get; set; } public DateTime DoneTime { get; set; } public DateTime FailTime { get; set; } }
4)任務流程控制鎖(全局鎖):
enum LockStatus { PreLock = 0, Locked = 1, UnLock = 2 }
/// <summary> /// 所有任務流程是否被Lock掉 /// 1)當一批次任務未完成之前就不允許有任何新的一批任務開始執行, /// 必須等到一批次任務流程執行完成後才可以執行,否則將會導致數據執行的速度過慢,或者導致數據混亂情況。 /// 2)整個系統中,要確保正在執行的任務流程只有唯一一個,否則系統將會造成性能底下,或者出現數據錯亂情況。 /// </summary> class TaskLock { public LockStatus Lock { get; set; } /// <summary> /// 當開始執行新的一批次任務流程是,把該批次任務流程組編號寫入此處,同時修改TaskLock.isLock為true. /// </summary> public Guid DoingTaskGroup { get; set; } }
代碼下載:
鏈接:http://pan.baidu.com/s/1pKIYwl1 密碼:mbl7
流程控制:分布式並行任務流程控制