深入理解[Master-Worker模式]原理與技術
Master-Worker模式是常用的並行模式之一。它的核心思想是,系統由兩類進程協作工作:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程將子任務處理完成後,將結果返回給Master進程,由Master進程做歸納和匯總,從而得到系統的最終結果,其處理過程如圖1所示。
Master-Worker模式的好處,它能夠將一個大任務分解成若幹個小任務,並且執行,從而提高系統的吞吐量。而對於系統請求者Client來說,任務一旦提交,Master進程會分配任務並立即返回,並不會等待系統全部處理完成後再返回,其處理過程是異步的。因此Client不會出現等待現象。
1.Master-Worker的模式結構
Master-Worker模式是一種使用多線程進行數據結構處理的結構。
Master進程為主要進程,它維護了一個Worker進程隊列、子任務隊列和子結果集。Worker進程隊列中的Worker進程,不停地從任務隊列中提取要處理的子任務,並將子任務的處理結果寫入結果集。
2.Master-Worker的代碼實現
基於以上的思路實現一個簡易的master-worker框架。其中Master部分的代碼如下:
public class Master { //任務隊列 protected Queue<Object> workQuery = new ConcurrentLinkedQueue<Object>(); //worker進程隊列 protected Map<String, Thread> threadMap = new HashMap<>(); //子任務處理結果集 protected Map<String, Object> resultMap = new ConcurrentHashMap<>(); //是否所有的子任務都結束了 public boolean isComplete() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { if (entry.getValue().getState()!=Thread.State.TERMINATED){ return false; } } return true; } //Master 的構造,需要一個Worker 進程邏輯,和需要的Worker進程數量 public Master(Worker worker,int countWorker){ worker.setWorkQueue(workQuery); worker.setResultMap(resultMap); for (int i = 0; i < countWorker; i++) { threadMap.put(Integer.toString(i),new Thread(worker)); } } //提交一個任務 public void submit(Object job){ workQuery.add(job); } //返回子任務結果集 public Map<String,Object> getResultMap(){ return resultMap; } //開始運行所有的worker進程,進行處理 public void execute(){ for (Map.Entry<String,Thread> entry : threadMap.entrySet()){ entry.getValue().start(); } } }
對應的Worker進程的代碼實現:
public class Worker implements Runnable { //任務隊列,用於取得子任務 protected Queue<Object> workQueue; //子任務處理結果集 protected Map<String, Object> resultMap; public void setWorkQueue(Queue<Object> workQueue) { this.workQueue = workQueue; } public void setResultMap(Map<String, Object> resultMap) { this.resultMap = resultMap; } //子任務處理的邏輯,在子類中實現具體邏輯 public Object handle(Object input) { return input; } @Override public void run() { while (true) { //獲取子任務 Object input = workQueue.poll(); if (input == null) { break; } //處理子任務 Object re = handle(input); //將處理結果寫入結果集 resultMap.put(Integer.toString(input.hashCode()), re); } } }
以上兩段代碼已經展示了Master-Worker框架的全貌。應用程序通過重載 Worker.handle() 方法實現應用層邏輯。
例如,要實現計算1+2+..+100的結果,代碼如下:
public class PlusWorker extends Worker {
@Override
public Object handle(Object input) {
Integer i = (Integer) input;
return i+1;
}
public static void main(String[] args) {
Master master = new Master(new PlusWorker(), 5);
for (int i = 0; i < 100; i++) {
master.submit(i); //提交一百個子任務
}
master.execute(); //開始計算
int re = 0;
Map<String, Object> resultMap = master.getResultMap();
while (resultMap.size() > 0 || !master.isComplete()) {
Set<String> keys = resultMap.keySet();
String key = null;
for (String k : keys) {
key = k;
break;
}
Integer i = null;
if (key != null) {
i = (Integer) resultMap.get(key); //從結果集中獲取結果
}
if (i != null) {
re += i; //最終結果
}
if (key != null) {
resultMap.remove(key); //移除已經被計算過的項
}
}
System.out.println("result: " + re);
}
}
運行結果:
result: 5050
在應用層代碼中,創建了5個Worker工作進程和Worker工作實例PlusWorker。在提交了100個子任務後,便開始子任務的計算。這些子任務中,由生成的5個Worker進程共同完成。Master並不等待所有的Worker執行完畢,就開始訪問子結果集進行最終結果的計算,直到子結果集中所有的數據都被處理,並且5個活躍的Worker進程全部終止,才給出最終計算結果。
Master-Worker模式是一種將串行任務並行化的方法,被分解的子任務在系統中可以被並行處理。同時,如果有需要,Master進程不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果。
3.Amino框架提供的Master-Worker模式
在Amino框架中為Master-Worker模式提供了較為完善的實現和便捷的操作接口。Amino實現了兩套Master-Worker實現:一種是靜態的Master-Worker實現,另一種是動態實現。
靜態實現不允許在任務開始時添加新的子任務,而動態的Master-Worker允許在任務執行過程中,由Master或Worker添加新的子任務。
在Amino框架中,MasterWorkerFactory.newStatic(new Pow3(),20)
用於創建靜態的Master-Worker模式,
第二個參數為Worker線程數,第一個參數為執行的任務類,該類需實現Doable<Integer,Integer>
接口,該接口泛型的第一個類型為任務方法的參數類型,第二個類型為方法返回類型。MasterWorkerFactory.newDynamic(new Pow3Dyn())
用於創建動態的Master-Worker模式,其中參數為實現DynamicWorker
接口的實例。
submit()
方法用於提交應用層任務,execute()
方法將執行所有任務。
Amino框架需要自行下載,下載地址:https://sourceforge.net/projects/amino-cbbs/files/cbbs/0.5.3/,找到cbbs-java-bin-0.5.3.tar.gz 下載即可。
下面用Amino框架演示1+2+..+100的完整示例。
public class Pow3 implements Doable<Integer,Integer> {
@Override
public Integer run(Integer input) {
//業務邏輯
return input;
}
}
public class Pow3Dyn implements DynamicWorker<Integer,Integer> {
@Override
public Integer run(Integer integer, WorkQueue<Integer> workQueue) {
//業務邏輯
return integer;
}
}
public class AminoDemo {
/
* Amino 框架提供開箱即用的Master-Worker模式
* 其它用法參考API文檔
*/
public static void main(String[] args) {
new AminoDemo().testDynamic();
new AminoDemo().testStatic();
}
/
* 靜態模式,不允許在任務開始後添加新的任務
*/
public void testStatic(){
MasterWorker<Integer,Integer> mw = MasterWorkerFactory.newStatic(new Pow3(),20);//靜態模式,可指定線程數
List<MasterWorker.ResultKey> keyList = new Vector<>();
for (int i = 1; i <= 100; i++) {
keyList.add(mw.submit(i)); //傳參並調度任務,key用於取得任務結果
}
mw.execute();//執行所有任務
int re = 0;
while (keyList.size()> 0){ //不等待全部執行完成,就開始求和
MasterWorker.ResultKey k = keyList.get(0);
Integer i = mw.result(k); //由Key取得一個任務結果
if (i!=null){
re+=i;
keyList.remove(0); //累加完成後
}
}
System.out.println("result:"+re);
mw.shutdown();//關閉master-worker,釋放資源
}
/
* 動態模式,可在開始執行任務後繼續添加任務
*/
public void testDynamic(){
MasterWorker<Integer,Integer> mw = MasterWorkerFactory.newDynamic(new Pow3Dyn());//動態模式,可指定線程數
List<MasterWorker.ResultKey> keyList = new Vector<>();
for (int i = 1; i < 50; i++) {
keyList.add(mw.submit(i)); //傳參並調度任務,key用於取得任務結果
}
mw.execute();
for (int i = 50; i <= 100; i++) {
keyList.add(mw.submit(i)); //傳參並調度任務,key用於取得任務結果
}
int re = 0;
while (keyList.size()> 0){
MasterWorker.ResultKey k = keyList.get(0);
Integer i = mw.result(k); //由Key取得一個任務結果
if (i!=null){
re+=i;
keyList.remove(0); //累加完成後
}
}
System.out.println("result:"+re);
mw.shutdown();
}
}
運行結果:
result:5050
result:5050
MasterWorker類的方法摘要,其它請自行下載API文檔。cbbs-java-apidocs-0.5.3.tar.gz
方法摘要 | |
---|---|
boolean |
execute() Begin processing of the work items submitted. |
boolean |
execute(long timeout, java.util.concurrent.TimeUnit unit) Begin processing of the work items submitted. |
void |
finished() Indicate to the master/worker that there is not more work coming. |
java.util.Collection<T> |
getAllResults() Obtain all of the results from the processing work items. |
boolean |
isCompleted() Poll an executing master/worker for completion. |
boolean |
isStatic() Determine if a master/worker is static. |
int |
numWorkers() Get the number of active workers. |
T |
result(MasterWorker.ResultKey k) Obtain the results from the processing of a work item. |
void |
shutdown() Shutdown the master/worker. |
MasterWorker.ResultKey |
submit(S w) Submit a work item for processing. |
MasterWorker.ResultKey |
submit(S w, long timeout, java.util.concurrent.TimeUnit unit) Submit a work item for processing and block until it is either submitted successfully or the specified timeout period has expired. |
boolean |
waitForCompletion() Wait until all workers have completed. |
boolean |
waitForCompletion(long timeout, java.util.concurrent.TimeUnit unit) Wait until all workers have completed or the specified timeout period expires. |
深入理解[Master-Worker模式]原理與技術