併發處理之worker-master 模式
阿新 • • 發佈:2020-10-08
worker-master模式是一種將順序執行的任務轉為併發執行,順序執行的任務之間相互之間沒有關係
如圖:
相關程式碼實現簡易版:
1)master 實現
package com.lwd.worker_master; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; /** * 分配任務/合併結果集 * @author liuwd */ public class Master { /*** 任務佇列 */ private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); /** * 工作程序 */ private Map<String,Thread> threadMap = new HashMap<>(16); /** * 子任務處理結果集 */ private Map<String,Object> resultMap = new HashMap<>(16);public Master(Worker worker,int count){ worker.setWorkerQueue(queue); worker.setResultMap(resultMap); for (int i = 0; i < count; i++) { threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i))); } } /** * 是否子任務都結束了 * @return*/ public boolean isComplte(){ Set<Map.Entry<String, Thread>> entries = threadMap.entrySet(); for (Map.Entry<String, Thread> entry : entries) { Thread thread = entry.getValue(); if(thread.getState()!=Thread.State.TERMINATED){ return false; } } return true; } /** * 提交任務 * @param obj */ public void submit(Object obj){ queue.add(obj); } /** * 返回結果集 * @return */ public Map<String,Object> getResultMap(){ return resultMap; } /** * 執行任務 開啟程序 */ public void execute(){ Set<Map.Entry<String, Thread>> entries = threadMap.entrySet(); for (Map.Entry<String, Thread> entry : entries) { entry.getValue().start(); } } }
2)worker實現
package com.lwd.worker_master; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; /** * 任務物件 用於處理相關任務 * @author liuwd */ public class Worker implements Runnable{ /** * 任務佇列 */ private ConcurrentLinkedQueue workerQueue; /** * 結果集 */ private Map<String,Object> resultMap; public void setWorkerQueue(ConcurrentLinkedQueue workerQueue) { this.workerQueue = workerQueue; } public void setResultMap(Map<String, Object> resultMap) { this.resultMap = resultMap; } @Override public void run() { while (true){ Object poll = workerQueue.poll(); if(null == poll){ break; } Object handle = handle(poll); resultMap.put(Integer.toString(handle.hashCode()),handle); } } /** * 處理任務 * @param obj * @return */ public Object handle(Object obj){ return obj; } }
3)RealWork實現
package com.lwd.worker_master; /** * 實際任務類 * @author liuwd */ public class RealWorker extends Worker { @Override public Object handle(Object obj) { Integer i = (Integer)obj; return i*i; } }
4)WorkMasterMain.java 需求執行實現
package com.lwd.worker_master; import java.util.Iterator; import java.util.Map; /** * 當前模式的使用主體類 * @author liuwd */ public class WokerMasterMain { public static void main(String[] args) { Master master = new Master(new RealWorker(), 5); Integer integer = squaresSum(master, 100); System.out.println(integer); } /** * 1-100平方和 */ public static Integer squaresSum(Master master,int num){ for (int i = 0; i <num ; i++) { master.submit(i); } master.execute(); int result = 0; Map<String, Object> resultMap = master.getResultMap(); while (resultMap.size()>0&&!master.isComplte()){ Iterator<String> iterator = resultMap.keySet().iterator(); while (iterator.hasNext()){ String key = iterator.next(); Object o = resultMap.get(key); if(null != o){ Integer i = (Integer)o; result+=i; } iterator.remove(); } } return result; } }