1. 程式人生 > >14.多執行緒設計模式 - Master-Worker模式

14.多執行緒設計模式 - Master-Worker模式

多執行緒設計模式 - Master-Worker模式

 併發設計模式屬於設計優化的一部分,它對於一些常用的多執行緒結構的總結和抽象。與序列相比並行程式結構通常較為複雜,因此合理的使用並行模式在多執行緒併發中更具有意義。


1. Master-Worker模式

  • - Master-Worker模式是常用的並行模式。它的核心思想是系統由兩類程序協作工作:Master程序和Worker程序。Master負責接收和分配任務,Worker負責處理子任務。當各個Worker子程序處理完成後,會將結果返回給Master,由Master做歸納和總結。
  • - 其好處是能將一個大任務分解成若干個小任務,並行執行,從而提高系統的吞吐量。

示例:下
說明:看類註釋即可明瞭 不明白可以去屎了

  1 //Task.java  
  2   public class Task {
  3 
  4       private int id;
  5       private String name;
  6       private int price;
  7       public int getId() {
  8           return id;
  9       }
 10       public void setId(int id) {
 11           this.id = id;
 12
} 13 public String getName() { 14 return name; 15 } 16 public void setName(String name) { 17 this.name = name; 18 } 19 public int getPrice() { 20 return price; 21 } 22 public void setPrice(int price) { 23 this
.price = price; 24 } 25 } 26 //Worker.java 27 public class Worker implements Runnable{ 28 29 private ConcurrentLinkedQueue<Task> workQueue; 30 private ConcurrentHashMap<String,Object> resultMap; 31 32 @Override 33 public void run() { 34 while(true){ 35 Task input = this.workQueue.poll(); 36 if(input==null)break; 37 //真正的去做業務處理 38 Object output = MyWorker.handle(input); 39 this.resultMap.put(Integer.toString(input.getId()), output); 40 } 41 } 42 43 public static Object handle(Task input){ 44 return null; 45 } 46 47 public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) { 48 this.workQueue=workQueue; 49 } 50 51 public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { 52 this.resultMap=resultMap; 53 54 } 55 } 56 57 //MyWorker.java繼承Worker.java,重寫handle方法 58 public class MyWorker extends Worker{ 59 60 public static Object handle(Task input) { 61 Object output = null; 62 try { 63 //處理task的耗時,可能是資料的加工,也可能是操作資料庫 64 Thread.sleep(500); 65 output = input.getPrice(); 66 } catch (InterruptedException e) { 67 e.printStackTrace(); 68 } 69 return output; 70 } 71 72 } 73 //Master.java * 74 public class Master { 75 76 //1 應該有一個承裝任務的集合 77 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); 78 //2 使用普通的HashMap去承裝所有的worker物件 79 private HashMap<String,Thread> workers = new HashMap<String,Thread>(); 80 //3 使用一個容器承裝每一個worker併發執行的結果集 81 private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>(); 82 //4 構造方法 83 public Master(Worker worker,int workerCount){ 84 //每一個worker物件都需要有master的應用workQueue用於任務的領取,resultMap用於任務的提交 85 worker.setWorkerQueue(this.workQueue); 86 worker.setResultMap(this.resultMap); 87 88 for(int i=0;i<workerCount;i++){ 89 //key表示每一個worker的名字,value表示執行緒執行物件 90 workers.put("子節點"+Integer.toString(i), new Thread(worker)); 91 } 92 } 93 //5 提交方法 94 public void submit(Task task){ 95 this.workQueue.add(task); 96 } 97 //6 需要有一個執行的方法(啟動應用程式讓所有的worker工作) 98 public void execute(){ 99 for(Map.Entry<String,Thread> me:workers.entrySet()){ 100 me.getValue().start(); 101 } 102 } 103 //8 判斷執行緒是否執行完畢 104 public boolean isComplete() { 105 for(Map.Entry<String, Thread> me:workers.entrySet()){ 106 if(me.getValue().getState()!=Thread.State.TERMINATED){ 107 return false; 108 } 109 } 110 return true; 111 } 112 //9 返回結果集資料 113 public int getResult() { 114 int ret = 0; 115 for(Map.Entry<String,Object> me:resultMap.entrySet()){ 116 //彙總邏輯 117 ret+=(Integer)me.getValue(); 118 } 119 return ret; 120 } 121 } 122 //主函式 123 public class Main { 124 125 public static void main(String[] args) { 126 System.out.println("我的機器可用processor的數量:"+Runtime.getRuntime().availableProcessors()); 127 Master master = new Master(new MyWorker(),10); 128 129 Random r = new Random(); 130 for(int i=0;i<100;i++){ 131 Task task = new Task(); 132 task.setId(i); 133 task.setName("任務"+i); 134 task.setPrice(r.nextInt(1000)); 135 master.submit(task); 136 } 137 master.execute(); 138 139 long start = System.currentTimeMillis(); 140 while(true){ 141 if(master.isComplete()){ 142 long end = System.currentTimeMillis() -start; 143 int ret = master.getResult(); 144 System.out.println("最終結果:"+ret+",執行耗時:"+end+"毫秒"); 145 break; 146 } 147 } 148 } 149 150 }