1. 程式人生 > >java併發Fork/join框架-java併發程式設計的藝術

java併發Fork/join框架-java併發程式設計的藝術

package testforkandjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

//RecursiveTask extends ForkJoinTask 用於有返回結果的任務
//RecursiveAction extends ForkJoinTask 用於沒有返回結果的任務
//ForkJoinTask需要通過ForkJoinPool來執行
/**
 * 
 * 工作竊取演算法
 * 幹完活的執行緒,幫助其他執行緒幹活,造成競爭,所以使用雙端佇列,被竊取任務執行緒從雙端佇列的頭部拿任務執行,而竊取任務的執行緒從雙端佇列的尾部拿任務執行
 * 優點:充分利用執行緒進行平行計算,減少了執行緒間的競爭
 * 缺點:當雙端佇列裡只有一個任務時,存在競爭。該演算法會消耗過多的系統資源,建立了多個執行緒和雙端佇列
 * @ClassName: CountTask 
 * @Description: TODO(當一個工作執行緒的佇列中暫時沒有任務時,它會隨即從其他執行緒的佇列尾部獲取一個任務) 
 * @author 夢境迷離  
 * @date 2017-8-7 下午5:00:38 
 *
 */
public class CountTask extends RecursiveTask<Integer> {

	/** 
	 * @Fields serialVersionUID : TODO(用一句話描述這個變量表示什麼) 
	 */ 
	
	private static final long serialVersionUID = 1L;
	/** 
	 * @Title: main 
	 * @Description: TODO(這裡用一句話描述這個方法的作用) 
	 * @param @param args    設定檔案 
	 * @return void    返回型別 
	 * @throws 
	 */
	
	private static final int THRESGOLD = 2;//閾值
	private int start;
	private int end;
	
	public CountTask(int start, int end){
		this.start = start;
		this.end = end;
	}
	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		//生成計算任務,負責計算1+2+3+4
		CountTask task = new CountTask(1, 4);
		//執行一個任務
		Future<Integer> result  =  forkJoinPool.submit(task);
		if(task.isCompletedAbnormally()){
			System.out.println(task.getException());//檢查是否跑出異常,或已經取消任務,取消:CancellationException,未完成/沒有異常:null
		}
		try {
			System.out.println(result.get());
			
		} catch (Exception e) {
			// TODO: handle exception
		}

	}

	@Override
	protected Integer compute() {
		int sum = 0;
		//如果任務足夠小就計算任務
		boolean canCompute = (end-start)<=THRESGOLD;
		if(canCompute){
			for(int i=start; i<=end; i++){
				sum+=i;
			}
		}else {
			//如果任務大於閾值,就分裂成兩個小任務計算
			int middle = (start+end)/2;
			CountTask lefTask = new CountTask(start, middle);
			CountTask rightTask = new CountTask(middle+1, end);
			//執行子任務
			//子任務1負責執行1+2,子任務2負責執行3+4
			lefTask.fork();
			rightTask.fork();
			//等待子任務執行完,並得到結果
			int leftResult = lefTask.join();
			int rightResult = rightTask.join();
			//合併任務
			sum = leftResult+rightResult;
			
		}
		
		return sum;
	}

}
/**
 * ForkJoinPool由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成
 * ForkJoinTask陣列負責將存放程式交給ForkJoinPool的任務,而ForkJoinWorkerThread陣列負責執行這些任務
 * 
 * ForkJoinTask的fork方法內部呼叫了ForkJoinWorkerThread的pushTask方法,pushTaks方法把當前任務存放在ForkJoinTask陣列佇列中,
 * 然後呼叫ForkJoinPool的signalWork()方法喚醒或建立一個工作執行緒來執行任務
 * 
 * ForkJoinTaskd join方法內部呼叫了doJoin方法,通過該方法得到任務的狀態,
 * 1、已完成--返回結果
 * 2、被取消--跑出Cancellation異常
 * 3、訊號--
 * 4、異常--直接丟擲對應的異常
 * doJoin方法中,
 * 1、檢視任務狀態
 * 	-是否執行完成
 * 		是-返回任務狀態
 * 		否-從任務陣列中取出任務並執行
 * 	-順利完成--狀態設定為normal
 * 	-出現異常--記錄異常,狀態設定為exceptional
 * 			
 * 
 */