JUC執行緒框架深度解析 — 08、ForkJoinTask架構
阿新 • • 發佈:2019-02-04
【 ForkJoinTask分支任務 】
分之合併的設計思想主要是來自於CPU執行環境,ForkJoinTask是在JDK1.7之後追加到java之中的一個類庫,
【 沒有返回值的任務:RecursiveAction 】
很多的時候進行分支處理的時候有可能是沒有返回值的,所以可以修改一下任務的繼承父類”RecursiveAction”。
分之合併的設計思想主要是來自於CPU執行環境,ForkJoinTask是在JDK1.7之後追加到java之中的一個類庫,
其主要的功能是進行資源竊取功能的實現。
❥ 但需要注意的是,在分支合併任務的處理結構中一定要注意以下的幾點:
• 分支任務之中的資料的同步處理一定要有分支任務自己來完成,不要求進行額外的控制;
• 在進行分支處理操作的時候不要進行IO操作;
• 由於分支任務是捆綁一起執行的,如果出現了異常千萬別丟擲,會整體任務失敗。
❥ 在進行分支任務的處理之中主要使用如下的幾個核心類:
• ForkJoinTask : 進行分支合併任務的處理類;
• ForkJoinPool : 分支合併池。
【 分支任務類結構 】
【 有返回結果的任務:RecursiveTask 】
下面以一個簡單的1—100累加來實現一個分支的處理任務,如果要進行1—100的累加,
可以將任務分為兩個階段:1—50累加,第二個是進行51—100的累加。
範例:實現分支處理
整個的分支合併之中相當於就是一個執行緒池的擴充套件概念,在整體的計算之中,每一個分支都會產生一個新的執行緒物件進行計算,唯一的區別是,預設情況下的執行緒池是由使用者自己來設定的執行緒物件,而ForkJoin是由任務類自己根據情況進行拆分處理。import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class TestDemo { public static void main(String[] args) throws Exception { // 從0-100的累加處理操作 AddTask task = new AddTask(0, 100); ForkJoinPool pool = new ForkJoinPool() ; // 提交任務 Future<Integer> future = pool.submit(task); System.out.println(future.get()); } } @SuppressWarnings("serial") // 相當於繼承了ForkJoinTask父類 class AddTask extends RecursiveTask<Integer> { private int start; private int end; // 傳入計算的開始和結束的值 public AddTask(int start, int end) { this.start = start; this.end = end; } @Override // 是進行資料的分支處理操作 protected Integer compute() { // 儲存求和的計算結果 int sum = 0; if (end - start < 100) { // 開啟了分支 for (int x = start; x <= end; x++) { sum += x; } } else { int middle = (start + end) / 2; // 中間值 // 做0 - 50累加 AddTask leftTask = new AddTask(start, middle); // 做51 - 100累加 AddTask rightTask = new AddTask(middle + 1, end); // 表示開啟下一個分支計算,開啟的是computer() leftTask.fork(); // 表示開啟下一個分支計算,開啟的是computer() rightTask.fork(); // 把兩個的分支結果進行合併 sum = leftTask.join() + rightTask.join(); return sum; } return sum; } } // 輸出結果:5050
【 沒有返回值的任務:RecursiveAction 】
很多的時候進行分支處理的時候有可能是沒有返回值的,所以可以修改一下任務的繼承父類”RecursiveAction”。
範例:定義分支任務
計算完成了:5050import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class MLDNTestDemo { public static void main(String[] args) throws Exception { CountSave save = new CountSave() ; // 從0-100的累加處理操作 AddTask task = new AddTask(save,0, 100); ForkJoinPool pool = new ForkJoinPool() ; pool.submit(task);// 提交任務 // 當前的任務沒有結束 while(!task.isDone()) { TimeUnit.MILLISECONDS.sleep(100); } // 分支任務計算完成 if (task.isCompletedNormally()) { System.out.println("計算完成了:" + save.getSum()); } } } class CountSave { // 儲存資料處理結果 private Lock lock = new ReentrantLock() ; private int sum = 0 ; // 儲存處理結果 public void add(int sum) { this.lock.lock(); try { this.sum += sum ; } finally { this.lock.unlock(); } } public int getSum() { return this.sum ; } } // 相當於繼承了ForkJoinTask父類 @SuppressWarnings("serial") class AddTask extends RecursiveAction { private int start; private int end; private CountSave save ; // 傳入計算的開始和結束的值 public AddTask(CountSave save,int start, int end) { this.save = save ; this.start = start; this.end = end; } @Override protected void compute() { // 開啟了分支 if (end - start < 100) { int sum = 0; // 儲存求和的計算結果 for (int x = start; x <= end; x++) { sum += x; } this.save.add(sum); // 儲存計算結果 } else { // 中間值 int middle = (start + end) / 2; // 做0 - 50累加 AddTask leftTask = new AddTask(this.save,start, middle); // 做51 - 100累加 AddTask rightTask = new AddTask(this.save,middle + 1, end); // 並行執行的任務 super.invokeAll(leftTask, rightTask); } } }
實際上在使用ForkJoinTask處理的時候還可以取得使用的執行緒的訊息。
public class TestDemo {
public static void main(String[] args) throws Exception {
CountSave save = new CountSave() ;
// 從0-100的累加處理操作
AddTask task = new AddTask(save,0, 100);
ForkJoinPool pool = new ForkJoinPool() ;
// 提交任務
pool.submit(task);
while(!task.isDone()) { // 當前的任務沒有結束
System.out.println("活躍執行緒:" + pool.getActiveThreadCount()
+ "、最大的併發執行緒數:" + pool.getParallelism());
// TimeUnit.MILLISECONDS.sleep(100);
}
if (task.isCompletedNormally()) { // 分支任務計算完成
System.out.println("計算完成了:" + save.getSum());
}
}
}
活躍執行緒:1、最大的併發執行緒數:8
活躍執行緒:2、最大的併發執行緒數:8
活躍執行緒:2、最大的併發執行緒數:8
活躍執行緒:2、最大的併發執行緒數:8
活躍執行緒:2、最大的併發執行緒數:8
活躍執行緒:2、最大的併發執行緒數:8
活躍執行緒:1、最大的併發執行緒數:8
活躍執行緒:1、最大的併發執行緒數:8
活躍執行緒:1、最大的併發執行緒數:8
活躍執行緒:1、最大的併發執行緒數:8
計算完成了:5050
也就是說如果你現在要考慮所有底層的設計因素,那麼就必須針對當前的主機硬體環境做出判斷後才能夠寫出
良好的分支處理操作。