多執行緒分派任務的MW元件初步設想?持續更新...
阿新 • • 發佈:2019-01-08
public class Master<E> {
//存放E型別任務的容器,只對taskQuueu進行寫
private volatile BlockingQueue<? super E> TaskQueue;
//存放每一個worker任務結果的集合,只對result進行讀
private volatile BlockingQueue<? super Object> resultQueue;
//存放worker的合集
private volatile Map<String,Callable> wokersMap;
//執行任務的執行緒池
private ThreadPoolExecutor threadPool;
//存放執行結果
//該Master執行完成次數
//TODO
private AtomicInteger finishNum = new AtomicInteger(-1);
//初始化capacity
private static Integer DEFAULT_CAPACITY = 1200;
//使用鎖
private volatile Lock lock = new ReentrantLock();
//任務的future佇列
private List<Future> futureList = new ArrayList<>();
//整個master標誌
private AtomicBoolean flag = new AtomicBoolean(false);
//日誌
private final static Logger logger = LoggerFactory.getLogger(Master.class);
/**
* 構造一個大小為30,任務容量為capacity的執行緒池,初始化workQueue和workerMap
* @param worker
* @param workThreads
* @param capacity
*/
public Master(Worker worker, Integer workThreads, Integer capacity){
threadPool = new ThreadPoolExecutor(
10,
30,
1000L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(capacity),
new MWRejection());
TaskQueue = new ArrayBlockingQueue<E>(capacity);
wokersMap = new HashMap();
resultQueue = new LinkedBlockingQueue<Object> ();
worker.setTaskQueue(this.TaskQueue);
worker.setResultQueue(this.resultQueue);
for (Integer i = 0; i < workThreads; i++) {
wokersMap.put(Integer.toString(i),worker);
}
logger.info("{}個worker已經啟動",workThreads);
}
//構造
public Master(Worker worker, Integer workThreads){
this(worker,workThreads,DEFAULT_CAPACITY);
}
//執行任務
public void executeThread(){
wokersMap.forEach((k,v)->{
Future future = threadPool.submit(v);
futureList.add(future);
});
}
//提交任務
public void submitTask(E task){
lock.lock();
try {
this.TaskQueue.put(task);
logger.info("task add {}",task.toString());
}catch (Exception e){
System.out.println("提交失敗");
logger.error("{}提交失敗",task);
}finally {
lock.unlock();
}
}
//過程中間判斷,非阻塞
public boolean isComplete(){
lock.lock();
try{
if(futureList != null)
for (Future f : futureList) {
if (!f.isDone()) {
//任務尚未做完
return false;
}
}
return true;
}catch (Exception e){
return flag.get();
}finally {
System.out.println("準備釋放lock");
lock.unlock();
}
}
//最終狀態判斷,阻塞
public Boolean get(){
lock.lock();
try {
flag.set(true);
for (Future f : futureList) {
if(!(Boolean) f.get()){
flag.set(false);
System.out.println("future failure");
}
}
} catch (Exception e) {
flag.set(false);
e.printStackTrace();
}finally {
lock.unlock();
}
threadPool.shutdownNow();
System.out.println("完成了"+flag.get());
return flag.get();
}
//計算結果方法
/**
* 將Worker型別傳入
* @param
* @return
*/
public Integer getFinishedSum(){
lock.lock();
Integer sum = 0;
try{
if (this.get()){
if( resultQueue != null/* && threadPool.isTerminated()*/){
this.finishNum.incrementAndGet();
for (Object v:
resultQueue) {
if(v != null){
// this.finishNum.incrementAndGet();
sum += (Integer) v;
}
}
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
return sum;
}
}
@ConcurrentSafe
public abstract class Worker<E> implements Callable {
//只對taskQueue進行讀
private BlockingQueue<? extends E> taskQueue;
//只對resultQueue進行寫
private BlockingQueue<? super Object> resultQueue;
//使用鎖
private volatile Lock lock = new ReentrantLock();
//統計校驗
private static AtomicLong TaskID = new AtomicLong(0L);
public void setResultQueue(BlockingQueue<Object> resultQueue) {
this.resultQueue = resultQueue;
}
public void setTaskQueue(BlockingQueue<E> taskQueue) {
this.taskQueue = taskQueue;
}
private E getOne(){
lock.lock();
try {
if(taskQueue.size() != 0 && !taskQueue.isEmpty())
return this.taskQueue.poll();
return null;
} catch (Exception e) {
e.printStackTrace();
return null;
}finally {
lock.unlock();
}
}
public Boolean call() {
while(true){
try {
E task = getOne();
if(task == null){
break;
}
Object result = handle(task);
resultQueue.put(result);
TaskID.incrementAndGet();
}catch (Exception e){
e.printStackTrace();
return false;
}
}
return true;
}
//處理任務的方法
public abstract Object handle(E task);
//獲取任務的型別
public abstract Class<E> getTaskClass();
public static long getTaskID(){
return TaskID.get();
}
}