並發模型(二)——Master-Worker模式
阿新 • • 發佈:2017-11-02
string 程序 分配任務 之一 size void con .exe ||
Master-Worker模式是常用的並行模式之一,它的核心思想是,系統有兩個進程協作工作:Master進程,負責接收和分配任務;Worker進程,負責處理子任務。當Worker進程將子任務處理完成後,結果返回給Master進程,由Master進程做歸納匯總,最後得到最終的結果。
一、什麽是Master-Worker模式:
該模式的結構圖:
結構圖:
Worker:用於實際處理一個任務;
Master:任務的分配和最終結果的合成;
Main:啟動程序,調度開啟Master。
二、代碼實現:
下面的是一個簡易的Master-Worker框架實現。
(1)Master部分:
package MasterWorker; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { //任務隊列 protected Queue<Object> workQueue= newConcurrentLinkedQueue<Object>(); //Worker進程隊列 protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>(); //子任務處理結果集 protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>(); //是否所有的子任務都結束了 public booleanisComplete(){ for(Map.Entry<String , Thread> entry:threadMap.entrySet()){ if(entry.getValue().getState()!=Thread.State.TERMINATED){ return false; } } return true ; } //Master的構造,需要一個Worker進程邏輯,和需要Worker進程數量 public Master(Worker worker,int countWorker){ worker.setWorkQueue(workQueue); worker.setResultMap(resultMap); for(int i=0;i<countWorker;i++){ threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); } } //提交一個任務 public void submit(Object job){ workQueue.add(job); } //返回子任務結果集 public Map<String ,Object> getResultMap(){ return resultMap; } //開始運行所有的Worker進程,進行處理 public void execute(){ for(Map.Entry<String , Thread> entry:threadMap.entrySet()){ entry.getValue().start(); } } }
Worker進程實現:
package MasterWorker; import java.util.Map; import java.util.Queue; public class Worker implements Runnable{ //任務隊列,用於取得子任務 protected Queue<Object> workQueue; //子任務處理結果集 protected Map<String ,Object> resultMap; public void setWorkQueue(Queue<Object> workQueue){ this.workQueue= workQueue; } public void setResultMap(Map<String ,Object> resultMap){ this.resultMap=resultMap; } //子任務處理的邏輯,在子類中實現具體邏輯 public Object handle(Object input){ return input; } @Override public void run() { while(true){ //獲取子任務 Object input= workQueue.poll(); if(input==null){ break; } //處理子任務 Object re = handle(input); resultMap.put(Integer.toString(input.hashCode()), re); } } }
運用這個小框架計算1——100的立方和,PlusWorker的實現:
package MasterWorker; public class PlusWorker extends Worker { @Override public Object handle(Object input) { Integer i =(Integer)input; return i*i*i; } }
進行計算的Main函數:
package MasterWorker; import java.util.Map; import java.util.Set; public class Main { /** * @param args */ public static void main(String[] args) { //固定使用5個Worker,並指定Worker Master m = new Master(new PlusWorker(), 5); //提交100個子任務 for(int i=0;i<100;i++){ m.submit(i); } //開始計算 m.execute(); int re= 0; //保存最終結算結果 Map<String ,Object> resultMap =m.getResultMap(); //不需要等待所有Worker都執行完成,即可開始計算最終結果 while(resultMap.size()>0 || !m.isComplete()){ Set<String> keys = resultMap.keySet(); String key =null; for(String k:keys){ key=k; break; } Integer i =null; if(key!=null){ i=(Integer)resultMap.get(key); } if(i!=null){ //最終結果 re+=i; } if(key!=null){ //移除已經被計算過的項 resultMap.remove(key); } } } }
三、總結:
Master-Worker模式是一種將串行任務並行化的方案,被分解的子任務在系統中可以被並行處理,同時,如果有需要,Master進程不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果集。
並發模型(二)——Master-Worker模式