1. 程式人生 > 實用技巧 >併發處理之worker-master 模式

併發處理之worker-master 模式

worker-master模式是一種將順序執行的任務轉為併發執行,順序執行的任務之間相互之間沒有關係

如圖:

相關程式碼實現簡易版:

1)master 實現

package com.lwd.worker_master;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 *  分配任務/合併結果集
 * @author liuwd
 */
public class Master {
    /**
* 任務佇列 */ private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); /** * 工作程序 */ private Map<String,Thread> threadMap = new HashMap<>(16); /** * 子任務處理結果集 */ private Map<String,Object> resultMap = new HashMap<>(16);
public Master(Worker worker,int count){ worker.setWorkerQueue(queue); worker.setResultMap(resultMap); for (int i = 0; i < count; i++) { threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i))); } } /** * 是否子任務都結束了 * @return
*/ public boolean isComplte(){ Set<Map.Entry<String, Thread>> entries = threadMap.entrySet(); for (Map.Entry<String, Thread> entry : entries) { Thread thread = entry.getValue(); if(thread.getState()!=Thread.State.TERMINATED){ return false; } } return true; } /** * 提交任務 * @param obj */ public void submit(Object obj){ queue.add(obj); } /** * 返回結果集 * @return */ public Map<String,Object> getResultMap(){ return resultMap; } /** * 執行任務 開啟程序 */ public void execute(){ Set<Map.Entry<String, Thread>> entries = threadMap.entrySet(); for (Map.Entry<String, Thread> entry : entries) { entry.getValue().start(); } } }

2)worker實現

package com.lwd.worker_master;

import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 任務物件 用於處理相關任務
 * @author liuwd
 */
public class Worker implements Runnable{
    /**
     * 任務佇列
     */
    private ConcurrentLinkedQueue workerQueue;
    /**
     * 結果集
     */
    private Map<String,Object>  resultMap;


    public void setWorkerQueue(ConcurrentLinkedQueue workerQueue) {
        this.workerQueue = workerQueue;
    }


    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while (true){
            Object poll = workerQueue.poll();
            if(null == poll){
                break;
            }
            Object handle = handle(poll);
            resultMap.put(Integer.toString(handle.hashCode()),handle);
        }
    }

    /**
     * 處理任務
     * @param obj
     * @return
     */
    public Object handle(Object obj){
        return obj;
    }
}

3)RealWork實現

package com.lwd.worker_master;

/**
 *  實際任務類
 * @author liuwd
 */
public class RealWorker extends Worker {
    @Override
    public Object handle(Object obj) {
        Integer i = (Integer)obj;
        return i*i;
    }
}

4)WorkMasterMain.java 需求執行實現

package com.lwd.worker_master;

import java.util.Iterator;
import java.util.Map;


/**
 *  當前模式的使用主體類
 * @author liuwd
 */
public class WokerMasterMain {

    public static void main(String[] args) {
        Master master = new Master(new RealWorker(), 5);
        Integer integer = squaresSum(master, 100);
        System.out.println(integer);

    }

    /**
     * 1-100平方和
     */
    public static Integer squaresSum(Master master,int num){
        for (int i = 0; i <num ; i++) {
            master.submit(i);
        }
        master.execute();
        int result = 0;
        Map<String, Object> resultMap = master.getResultMap();
        while (resultMap.size()>0&&!master.isComplte()){
            Iterator<String> iterator = resultMap.keySet().iterator();
            while (iterator.hasNext()){
                String key = iterator.next();
                Object o = resultMap.get(key);
                if(null != o){
                    Integer i = (Integer)o;
                    result+=i;
                }
                iterator.remove();
            }

        }
        return result;
    }
}