Master-Worker設計模式介紹
阿新 • • 發佈:2018-04-21
stat 方式 tint 代碼實現 exe ide port client shm
Master-Worker模式是常用的並行設計模式。核心思想是,系統由兩個角色組成,Master和Worker,Master負責接收和分配任務,Worker負責處理子任務。任務處理過程中,Master還負責監督任務進展和Worker的健康狀態;Master將接收Client提交的任務,並將任務的進展匯總反饋給Client。各角色關系如下圖
Master-Worker模式滿足於可以將大任務劃分為小任務的場景,是一種分而治之的設計理念。通過多線程或者多進程多機器的模式,可以將小任務處理分發給更多的CPU處理,降低單個CPU的計算量,通過並發/並行提高任務的完成速度,提高系統的性能。
具體細節如上圖,Master對任務進行切分,並放入任務隊列;然後,觸發Worker處理任務。實際操作中,任務的分配有多種形式,如Master主動拉起Workder進程池或線程池,並將任務分配給Worker;或者由Worker主動領取任務,這樣的Worker一般是常駐進程;還有一種解耦的方式,即Master指做任務的接收、切分和結果統計,指定Worker的數量和性能指標,但不參與Worker的實際管理,而是交由第三方調度監控和調度Worker。
代碼實現Master-Worker模式:
Master代碼:
1 package com.hjf.master_worker; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.concurrent.ConcurrentHashMap; 6 import java.util.concurrent.ConcurrentLinkedQueue; 7 8 /** 9 * Master 10 * @author huangjianfei 11 */12 public class Master 13 { 14 //1:應該有一個承載任務的集合 15 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); 16 17 //2:使用hashmap去承載所有的worker對象 ThreadName------Worker 18 private HashMap<String,Thread> workers = new HashMap<>();19 20 //3:使用一個容器承載每一個worker並行執行任務的結果集 21 private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String, Object>(); 22 23 //4:構造方法 24 public Master(Worker worker,int workerCount){ 25 //在worker中添加兩個引用 workQueue用於任務的領取 resultMap用於任務的提交 26 worker.setWorkerQueue(this.workQueue); 27 worker.setResultMap(this.resultMap); 28 29 for (int i = 0; i < workerCount; i++) 30 { 31 workers.put("子節點 "+i, new Thread(worker)); 32 } 33 } 34 35 //5:提交方法 36 public void submit(Task task){ 37 workQueue.add(task); 38 } 39 40 //6:需要有一個執行的方法(啟動應用程序 讓所有的worker工作) 41 public void execute(){ 42 //遍歷workers 分別去執行每一個worker 43 for (Map.Entry<String,Thread> me: workers.entrySet()) 44 { 45 me.getValue().start(); 46 } 47 } 48 49 /** 50 * 判斷所有的worker是否執行完畢 51 */ 52 public boolean isCompleted() 53 { 54 //遍歷所有的worker 只要有一個沒有停止 那麽就代表沒有結束 55 for (Map.Entry<String,Thread> me: workers.entrySet()) 56 { 57 if(me.getValue().getState() != Thread.State.TERMINATED){ 58 return false; 59 } 60 } 61 return true; 62 } 63 64 /** 65 * 計算最終的結果集 66 * @return 67 */ 68 public int getResult(){ 69 int result = 0; 70 for (Map.Entry<String,Object> me : resultMap.entrySet()) 71 { 72 result += (Integer)me.getValue(); 73 } 74 return result; 75 } 76 }
Worker代碼實現:
1 package com.hjf.master_worker; 2 3 import java.util.concurrent.ConcurrentHashMap; 4 import java.util.concurrent.ConcurrentLinkedQueue; 5 6 /** 7 * Worker 8 * @author huangjianfei 9 */ 10 public class Worker implements Runnable 11 { 12 private ConcurrentLinkedQueue<Task> workQueue; 13 14 private ConcurrentHashMap<String, Object> resultMap; 15 16 public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) 17 { 18 this.workQueue = workQueue; 19 } 20 21 public void setResultMap(ConcurrentHashMap<String, Object> resultMap) 22 { 23 this.resultMap = resultMap; 24 } 25 26 @Override 27 public void run() 28 { 29 //處理一個個任務 30 while(true){ 31 //從隊列中取出一個元素 32 Task input = this.workQueue.poll(); 33 if(null == input) break; 34 //真正的去做業務處理 35 Object outPut = handle(input); 36 //存放任務的結果 37 this.resultMap.put(String.valueOf(input.getId()), outPut); 38 } 39 } 40 41 //單獨抽出來 給子類重寫,更加靈活 42 public Object handle(Task input){ 43 return null; 44 } 45 46 47 /** 48 * 處理業務 應該抽象出來 子類去具體實現業務邏輯 49 * @param input 50 */ 51 // private Object handle(Task input) 52 // { 53 // Object outPut = null; 54 // if(null == input) return null; 55 // try 56 // { 57 // //表示處理task任務的耗時,可能是數據的加工,也可能是操作數據庫 58 // Thread.sleep(5000); 59 // //模擬真實的業務場景 60 // outPut = input.getPrice(); 61 // } catch (InterruptedException e) 62 // { 63 // e.printStackTrace(); 64 // } 65 // return outPut; 66 // } 67 68 }
Task代碼實現:
1 package com.hjf.master_worker; 2 /** 3 * 任務 4 * @author huangjianfei 5 */ 6 public class Task 7 { 8 private int id; 9 private String name; 10 private int price; 11 public int getId() 12 { 13 return id; 14 } 15 public void setId(int id) 16 { 17 this.id = id; 18 } 19 public String getName() 20 { 21 return name; 22 } 23 public void setName(String name) 24 { 25 this.name = name; 26 } 27 public int getPrice() 28 { 29 return price; 30 } 31 public void setPrice(int price) 32 { 33 this.price = price; 34 } 35 36 }
Worker子類,在以後的開發中可以按照自己的需求去設計相關的Worker的子類:
1 package com.hjf.master_worker; 2 3 public class MyWorker1 extends Worker 4 { 5 @Override 6 public Object handle(Task input) 7 { 8 Object outPut = null; 9 if(null == input) return null; 10 try 11 { 12 //表示處理task任務的耗時,可能是數據的加工,也可能是操作數據庫 13 Thread.sleep(5000); 14 //模擬真實的業務場景 15 outPut = input.getPrice(); 16 } catch (InterruptedException e) 17 { 18 e.printStackTrace(); 19 } 20 return outPut; 21 } 22 }
Main測試類代碼:
1 package com.hjf.master_worker; 2 3 import java.util.Random; 4 /** 5 * 主線程測試類 6 * @author huangjianfei 7 */ 8 public class Main 9 { 10 public static void main(String[] args) 11 { 12 System.out.println("我的機器可用的Processor數量:"+Runtime.getRuntime().availableProcessors()); 13 // 使用worker子類實現具體的業務,更加靈活 14 Master master = new Master(new MyWorker1(), Runtime.getRuntime().availableProcessors()); 15 Random r = new Random(); 16 //提交100個任務 17 for (int i = 0; i <= 100; i++) 18 { 19 Task t = new Task(); 20 t.setId(i); 21 t.setName("任務 "+i); 22 t.setPrice(r.nextInt(1000)); 23 master.submit(t); 24 } 25 26 //執行所有的worker 27 master.execute(); 28 29 long start = System.currentTimeMillis();//記錄時間 30 31 32 while(true){ 33 //全部的worker執行結束的時候去計算最後的結果 34 if(master.isCompleted()){ 35 long end = System.currentTimeMillis() - start;//計算耗時 36 //計算結果集 37 int result = master.getResult(); 38 System.out.println("執行最終結果: "+result + " 執行耗時 "+end); 39 break; 40 } 41 } 42 43 } 44 45 }
Master-Worker設計模式介紹