java併發Fork/join框架-java併發程式設計的藝術
阿新 • • 發佈:2019-01-30
package testforkandjoin; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; //RecursiveTask extends ForkJoinTask 用於有返回結果的任務 //RecursiveAction extends ForkJoinTask 用於沒有返回結果的任務 //ForkJoinTask需要通過ForkJoinPool來執行 /** * * 工作竊取演算法 * 幹完活的執行緒,幫助其他執行緒幹活,造成競爭,所以使用雙端佇列,被竊取任務執行緒從雙端佇列的頭部拿任務執行,而竊取任務的執行緒從雙端佇列的尾部拿任務執行 * 優點:充分利用執行緒進行平行計算,減少了執行緒間的競爭 * 缺點:當雙端佇列裡只有一個任務時,存在競爭。該演算法會消耗過多的系統資源,建立了多個執行緒和雙端佇列 * @ClassName: CountTask * @Description: TODO(當一個工作執行緒的佇列中暫時沒有任務時,它會隨即從其他執行緒的佇列尾部獲取一個任務) * @author 夢境迷離 * @date 2017-8-7 下午5:00:38 * */ public class CountTask extends RecursiveTask<Integer> { /** * @Fields serialVersionUID : TODO(用一句話描述這個變量表示什麼) */ private static final long serialVersionUID = 1L; /** * @Title: main * @Description: TODO(這裡用一句話描述這個方法的作用) * @param @param args 設定檔案 * @return void 返回型別 * @throws */ private static final int THRESGOLD = 2;//閾值 private int start; private int end; public CountTask(int start, int end){ this.start = start; this.end = end; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); //生成計算任務,負責計算1+2+3+4 CountTask task = new CountTask(1, 4); //執行一個任務 Future<Integer> result = forkJoinPool.submit(task); if(task.isCompletedAbnormally()){ System.out.println(task.getException());//檢查是否跑出異常,或已經取消任務,取消:CancellationException,未完成/沒有異常:null } try { System.out.println(result.get()); } catch (Exception e) { // TODO: handle exception } } @Override protected Integer compute() { int sum = 0; //如果任務足夠小就計算任務 boolean canCompute = (end-start)<=THRESGOLD; if(canCompute){ for(int i=start; i<=end; i++){ sum+=i; } }else { //如果任務大於閾值,就分裂成兩個小任務計算 int middle = (start+end)/2; CountTask lefTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle+1, end); //執行子任務 //子任務1負責執行1+2,子任務2負責執行3+4 lefTask.fork(); rightTask.fork(); //等待子任務執行完,並得到結果 int leftResult = lefTask.join(); int rightResult = rightTask.join(); //合併任務 sum = leftResult+rightResult; } return sum; } } /** * ForkJoinPool由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成 * ForkJoinTask陣列負責將存放程式交給ForkJoinPool的任務,而ForkJoinWorkerThread陣列負責執行這些任務 * * ForkJoinTask的fork方法內部呼叫了ForkJoinWorkerThread的pushTask方法,pushTaks方法把當前任務存放在ForkJoinTask陣列佇列中, * 然後呼叫ForkJoinPool的signalWork()方法喚醒或建立一個工作執行緒來執行任務 * * ForkJoinTaskd join方法內部呼叫了doJoin方法,通過該方法得到任務的狀態, * 1、已完成--返回結果 * 2、被取消--跑出Cancellation異常 * 3、訊號-- * 4、異常--直接丟擲對應的異常 * doJoin方法中, * 1、檢視任務狀態 * -是否執行完成 * 是-返回任務狀態 * 否-從任務陣列中取出任務並執行 * -順利完成--狀態設定為normal * -出現異常--記錄異常,狀態設定為exceptional * * */