1. 程式人生 > >多線程:多線程設計模式(三):Master-Worker模式

多線程:多線程設計模式(三):Master-Worker模式

fonts strong stat bre not 多線程 too () 部分

Master-Worker模式是常用的並行模式之一,它的核心思想是,系統有兩個進程協作工作:Master進程,負責接收和分配任務;Worker進程,負責處理子任務。當Worker進程將子任務處理完成後,結果返回給Master進程,由Master進程做歸納匯總,最後得到最終的結果。

一、什麽是Master-Worker模式:

該模式的結構圖:

技術分享

結構圖:

技術分享

Worker:用於實際處理一個任務;

Master:任務的分配和最終結果的合成;

Main:啟動程序,調度開啟Master。

二、代碼實現:

下面的是一個簡易的Master-Worker框架實現。

(1)Master部分:

[java] view plain copy
  1. package MasterWorker;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Queue;
  5. import java.util.concurrent.ConcurrentHashMap;
  6. import java.util.concurrent.ConcurrentLinkedQueue;
  7. public class Master {
  8. //任務隊列
  9. protected Queue<Object> workQueue= new ConcurrentLinkedQueue<Object>();
  10. //Worker進程隊列
  11. protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>();
  12. //子任務處理結果集
  13. protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>();
  14. //是否所有的子任務都結束了
  15. public boolean isComplete(){
  16. for(Map.Entry<String , Thread> entry:threadMap.entrySet()){
  17. if(entry.getValue().getState()!=Thread.State.TERMINATED){
  18. return false;
  19. }
  20. }
  21. return true ;
  22. }
  23. //Master的構造,需要一個Worker進程邏輯,和需要Worker進程數量
  24. public Master(Worker worker,int countWorker){
  25. worker.setWorkQueue(workQueue);
  26. worker.setResultMap(resultMap);
  27. for(int i=0;i<countWorker;i++){
  28. threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
  29. }
  30. }
  31. //提交一個任務
  32. public void submit(Object job){
  33. workQueue.add(job);
  34. }
  35. //返回子任務結果集
  36. public Map<String ,Object> getResultMap(){
  37. return resultMap;
  38. }
  39. //開始運行所有的Worker進程,進行處理
  40. public void execute(){
  41. for(Map.Entry<String , Thread> entry:threadMap.entrySet()){
  42. entry.getValue().start();
  43. }
  44. }
  45. }

(2)Worker進程實現:

[java] view plain copy
  1. package MasterWorker;
  2. import java.util.Map;
  3. import java.util.Queue;
  4. public class Worker implements Runnable{
  5. //任務隊列,用於取得子任務
  6. protected Queue<Object> workQueue;
  7. //子任務處理結果集
  8. protected Map<String ,Object> resultMap;
  9. public void setWorkQueue(Queue<Object> workQueue){
  10. this.workQueue= workQueue;
  11. }
  12. public void setResultMap(Map<String ,Object> resultMap){
  13. this.resultMap=resultMap;
  14. }
  15. //子任務處理的邏輯,在子類中實現具體邏輯
  16. public Object handle(Object input){
  17. return input;
  18. }
  19. @Override
  20. public void run() {
  21. while(true){
  22. //獲取子任務
  23. Object input= workQueue.poll();
  24. if(input==null){
  25. break;
  26. }
  27. //處理子任務
  28. Object re = handle(input);
  29. resultMap.put(Integer.toString(input.hashCode()), re);
  30. }
  31. }
  32. }

(3)運用這個小框架計算1——100的立方和,PlusWorker的實現:

[java] view plain copy
  1. package MasterWorker;
  2. public class PlusWorker extends Worker {
  3. @Override
  4. public Object handle(Object input) {
  5. Integer i =(Integer)input;
  6. return i*i*i;
  7. }
  8. }

(4)進行計算的Main函數:

[java] view plain copy
  1. package MasterWorker;
  2. import java.util.Map;
  3. import java.util.Set;
  4. public class Main {
  5. /**
  6. * @param args
  7. */
  8. public static void main(String[] args) {
  9. //固定使用5個Worker,並指定Worker
  10. Master m = new Master(new PlusWorker(), 5);
  11. //提交100個子任務
  12. for(int i=0;i<100;i++){
  13. m.submit(i);
  14. }
  15. //開始計算
  16. m.execute();
  17. int re= 0;
  18. //保存最終結算結果
  19. Map<String ,Object> resultMap =m.getResultMap();
  20. //不需要等待所有Worker都執行完成,即可開始計算最終結果
  21. while(resultMap.size()>0 || !m.isComplete()){
  22. Set<String> keys = resultMap.keySet();
  23. String key =null;
  24. for(String k:keys){
  25. key=k;
  26. break;
  27. }
  28. Integer i =null;
  29. if(key!=null){
  30. i=(Integer)resultMap.get(key);
  31. }
  32. if(i!=null){
  33. //最終結果
  34. re+=i;
  35. }
  36. if(key!=null){
  37. //移除已經被計算過的項
  38. resultMap.remove(key);
  39. }
  40. }
  41. }
  42. }

三、總結:

Master-Worker模式是一種將串行任務並行化的方案,被分解的子任務在系統中可以被並行處理,同時,如果有需要,Master進程不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果集。

轉:http://blog.csdn.net/lmdcszh/article/details/39698189

多線程:多線程設計模式(三):Master-Worker模式