1. 程式人生 > >並發模型(二)——Master-Worker模式

並發模型(二)——Master-Worker模式

string 程序 分配任務 之一 size void con .exe ||

Master-Worker模式是常用的並行模式之一,它的核心思想是,系統有兩個進程協作工作:Master進程,負責接收和分配任務;Worker進程,負責處理子任務。當Worker進程將子任務處理完成後,結果返回給Master進程,由Master進程做歸納匯總,最後得到最終的結果。

一、什麽是Master-Worker模式:

該模式的結構圖:

技術分享

結構圖:

技術分享

Worker:用於實際處理一個任務;

Master:任務的分配和最終結果的合成;

Main:啟動程序,調度開啟Master。

二、代碼實現:

下面的是一個簡易的Master-Worker框架實現。

(1)Master部分:

package MasterWorker;  
  
import java.util.HashMap;  
import java.util.Map;  
import java.util.Queue;  
import java.util.concurrent.ConcurrentHashMap;  
import java.util.concurrent.ConcurrentLinkedQueue;  
  
public class Master {  
  
    //任務隊列  
    protected Queue<Object> workQueue= new
ConcurrentLinkedQueue<Object>(); //Worker進程隊列 protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>(); //子任務處理結果集 protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>(); //是否所有的子任務都結束了 public boolean
isComplete(){ for(Map.Entry<String , Thread> entry:threadMap.entrySet()){ if(entry.getValue().getState()!=Thread.State.TERMINATED){ return false; } } return true ; } //Master的構造,需要一個Worker進程邏輯,和需要Worker進程數量 public Master(Worker worker,int countWorker){ worker.setWorkQueue(workQueue); worker.setResultMap(resultMap); for(int i=0;i<countWorker;i++){ threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); } } //提交一個任務 public void submit(Object job){ workQueue.add(job); } //返回子任務結果集 public Map<String ,Object> getResultMap(){ return resultMap; } //開始運行所有的Worker進程,進行處理 public void execute(){ for(Map.Entry<String , Thread> entry:threadMap.entrySet()){ entry.getValue().start(); } } }

Worker進程實現:

package MasterWorker;  
  
import java.util.Map;  
import java.util.Queue;  
  
public class Worker  implements Runnable{  
  
    //任務隊列,用於取得子任務  
    protected Queue<Object> workQueue;  
    //子任務處理結果集  
    protected Map<String ,Object> resultMap;  
    public void setWorkQueue(Queue<Object> workQueue){  
        this.workQueue= workQueue;  
    }  
      
    public void setResultMap(Map<String ,Object> resultMap){  
        this.resultMap=resultMap;  
    }  
    //子任務處理的邏輯,在子類中實現具體邏輯  
    public Object handle(Object input){  
        return input;  
    }  
      
      
    @Override  
    public void run() {  
          
        while(true){  
            //獲取子任務  
            Object input= workQueue.poll();  
            if(input==null){  
                break;  
            }  
            //處理子任務  
            Object re = handle(input);  
            resultMap.put(Integer.toString(input.hashCode()), re);  
        }  
    }  
  
}  

運用這個小框架計算1——100的立方和,PlusWorker的實現:

package MasterWorker;  
  
public class PlusWorker extends Worker {  
  
    @Override  
    public Object handle(Object input) {  
          
        Integer i =(Integer)input;  
        return i*i*i;  
    }  
  
      
}  

進行計算的Main函數:

package MasterWorker;  
  
import java.util.Map;  
import java.util.Set;  
  
public class Main {  
  
      
    /** 
     * @param args 
     */  
    public static void main(String[] args) {  
        //固定使用5個Worker,並指定Worker  
        Master m = new Master(new PlusWorker(), 5);  
        //提交100個子任務  
        for(int i=0;i<100;i++){  
            m.submit(i);  
        }  
        //開始計算  
        m.execute();  
        int re= 0;  
        //保存最終結算結果  
        Map<String ,Object> resultMap =m.getResultMap();  
          
        //不需要等待所有Worker都執行完成,即可開始計算最終結果  
        while(resultMap.size()>0 || !m.isComplete()){  
            Set<String> keys = resultMap.keySet();  
            String key =null;  
            for(String k:keys){  
                key=k;  
                break;  
            }  
            Integer i =null;  
            if(key!=null){  
                i=(Integer)resultMap.get(key);  
            }  
            if(i!=null){  
                //最終結果  
                re+=i;  
            }  
            if(key!=null){  
                //移除已經被計算過的項  
                resultMap.remove(key);  
            }  
              
        }  
          
  
    }  
  
}  

三、總結:

Master-Worker模式是一種將串行任務並行化的方案,被分解的子任務在系統中可以被並行處理,同時,如果有需要,Master進程不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果集。

並發模型(二)——Master-Worker模式