java多執行緒Master-worker模式
Master-Worker模式是常用的平行計算模式。它的核心思想是系統由兩類程序協作工作:Master程序和Worker程序。Master負責接收和分配任務,Worker負責處理子任務。當各個Worker子程序處理完成後,會將結果返回給Master,由Master做歸納和總結。其好處是能將一個大任務分解成若干個小任務,並行執行,從而提高系統的吞吐量。
主要模組分為:
Master:中控排程作用,起到分配任務,彙總結果集。包含ConcurrentLinkedQueue容器承裝所有的任務,HashMap<String,Thread>容器承裝所有的worker物件,ConcurrentHashMap<String,Object>併發容器裝載每個worker併發處理任務的結果集。
worker:實現Runnable介面或繼承Thread實現多執行緒。每個worker物件需要有master的ConcurrentLinkedQueue的引用用於獲取任務,每個worker物件需要有ConcurrentHashMap<String,Object>的引用用於承裝返回結果
Main:呼叫Master進行測試
Task:封裝任務
示例程式碼:
Master類:
public class Master {
//1.承裝任務的容器
private ConcurrentLinkedQueue<Task> workQueue=new ConcurrentLinkedQueue<>();
//2.使用HashMap承裝所有的worker物件
private HashMap<String, Thread> workers=new HashMap<>();
//3.使用一個容器承裝每一個worker並行執行任務的結果集
private ConcurrentHashMap<String, Object> resultMap=new ConcurrentHashMap<>();
//4.構造方法(執行任務的worker物件,和建立worker的執行緒數)
public Master(Worker worker,int workerCount) {
//每一個worker物件都需要有Master的引用workerQueue用於任務的領取,resultMap用於任務的提交
worker.setWorkerQueue(this.workQueue);
worker.setResultMap(this.resultMap);
for (int i = 0; i < workerCount; i++) {
//key代表每個worker的名稱,value表示每個worker
workers.put("子節點"+i, new Thread(worker));
}
}
//5.提交方法
public void submit(Task task) {
this.workQueue.add(task);
}
//6.需要一個執行方法(啟動應用程式 讓所有worker工作)
public void execute() {
for (Map.Entry<String, Thread> mt : workers.entrySet()) {
mt.getValue().start();
}
}
//7.判斷執行緒是否執行完畢
public boolean isComplete() {
for (Map.Entry<String, Thread> mt : workers.entrySet()) {
if(mt.getValue().getState()!=Thread.State.TERMINATED) {
return false;
}
}
return true;
}
//8.返回結果集資料
public int getResult() {
int ret=0;
for (Map.Entry<String, Object> mt : resultMap.entrySet()) {
ret+=(Integer)mt.getValue();
}
return ret;
}
}
Worker類:
public class Worker implements Runnable{
private ConcurrentLinkedQueue<Task> workQueue;
private ConcurrentHashMap<String, Object> resultMap;public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
this.workQueue=workQueue;
}
public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap=resultMap;
}
@Override
public void run() {
while (true) {
Task input=workQueue.poll();
if(input==null) break;
//真正的去處理業務
Object output=handle(input);
this.resultMap.put(Integer.toString(input.getId()), output);
}
}
private Object handle(Task input) {
Object output=null;
try {
//表示處理Task任務的耗時,可能是資料的加工,也可能是操作資料庫。。。。
Thread.sleep(500);
output=input.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
}}
Task類:
public class Task {
private int id;
private String name;
private int price;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
@Override
public String toString() {
return "Task [id=" + id + ", name=" + name + ", price=" + price + "]";
}
}
Main類:
public class Main {
public static void main(String[] args) {
Worker worker=new Worker();
Master master=new Master(worker, 20);
Random r=new Random();
for (int i = 1; i <= 100; i++) {
Task task=new Task();
task.setId(i);
task.setName("任務"+i);
task.setPrice(r.nextInt(1000));
master.submit(task);
}
master.execute();
long start=System.currentTimeMillis();
while(true) {
if(master.isComplete()) {
long end=System.currentTimeMillis()-start;
int ret=master.getResult();
System.out.println("最終結果:"+ret+"執行耗時:"+end);
break;
}
}
}
}
執行結果:
最終結果:50622執行耗時:2501