因為一句話,秒懂二叉樹旋轉
在JDK7時,出現了一個新的框架用於並行執行任務,它的目的是為了把大型任務拆分為多個小任務,最後彙總多個小任務的結果,得到整大任務的結果,並且這些小任務都是同時在進行,大大提高運算效率。Fork就是拆分,Join就是合併。
我們來演示一下實際的情況,比如一個算式:18x7+36x8+9x77+8x53,可以拆分為四個小任務:18x7、36x8、9x77、8x53,最後我們只需要將這四個任務的結果加起來,就是我們原本算式的結果了,有點歸併排序的味道。
它不僅僅只是拆分任務並使用多執行緒,而且還可以利用工作竊取演算法,提高執行緒的利用
- 工作竊取演算法:是指某個執行緒從其他佇列裡竊取任務來執行。一個大任務分割為若干個互不依賴的子任務,為了減少執行緒間的競爭,把這些子任務分別放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和一對應。但是有的執行緒會先把自己佇列裡的任務幹完,而其他執行緒對應的佇列裡還有任務待處理。幹完活的執行緒與其等著,不如幫其他執行緒幹活,於是它就去其他執行緒的佇列裡竊取一個任務來執行。
Fork/Join使用兩個類來完成以上兩件事情:
ForkJoinTask
我們要使用ForkJoin框架,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join()
操作的機制,通常情況下我們不需要直接繼承ForkJoinTask類,而只需要繼承它的子類,Fork/Join框架提供了以下兩個子類:
-
RecursiveAction:用於沒有返回結果的任務。
-
RecursiveTask :用於有返回結果的任務。
-
ForkJoinPool 需要通過
ForkJoinPool
來執行,任務分割出的子任務會新增到當前工作執行緒所維護的雙端佇列中,進入佇列的頭部。當一個工作執行緒的佇列裡暫時沒有任務時,它會隨機從其他工作執行緒的佇列的尾部獲取一個任務。
現在我們來看看如何使用它,這裡以計算1-1000的和為例,我們可以將其拆分為8個小段的數相加,比如1-125、126-250... ,最後再彙總即可,它也是依靠執行緒池來實現的:
public static void main(String[] args) throws InterruptedException, ExecutionException { ForkJoinPool pool = new ForkJoinPool(); System.out.println(pool.submit(new SubTask(1, 1000)).get()); } /** * 繼承RecursiveTask,這樣才可以作為一個任務,泛型就是計算結果型別 */ private static class SubTask extends RecursiveTask<Integer> { private final int start; //比如我們要計算一個範圍內所有數的和,那麼就需要限定一下範圍,這裡用了兩個int存放 private final int end; public SubTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { if(end - start > 125) { //每個任務最多計算125個數的和,如果大於繼續拆分,小於就可以開始算了 SubTask subTask1 = new SubTask(start, (end + start) / 2); subTask1.fork(); //會繼續劃分子任務執行 SubTask subTask2 = new SubTask((end + start) / 2 + 1, end); subTask2.fork(); //會繼續劃分子任務執行 return subTask1.join() + subTask2.join(); //越玩越有遞迴那味了 } else { System.out.println(Thread.currentThread().getName()+" 開始計算 "+start+"-"+end+" 的值!"); int res = 0; for (int i = start; i <= end; i++) { res += i; } return res; //返回的結果會作為join的結果 } } }
可以看到,結果非常正確,但是整個計算任務實際上是拆分為了8個子任務同時完成的,結合多執行緒,原本的單執行緒任務,在多執行緒的加持下速度成倍提升。
包括Arrays工具類提供的並行排序也是利用了ForkJoinPool來實現:
public static void parallelSort(byte[] a) { int n = a.length, p, g; if (n <= MIN_ARRAY_SORT_GRAN || (p = ForkJoinPool.getCommonPoolParallelism()) == 1) DualPivotQuicksort.sort(a, 0, n - 1); else new ArraysParallelSortHelpers.FJByte.Sorter (null, a, new byte[n], 0, n, 0, ((g = n / (p << 2)) <= MIN_ARRAY_SORT_GRAN) ? MIN_ARRAY_SORT_GRAN : g).invoke(); }
並行排序的效能在多核心CPU環境下,肯定是優於普通排序的,並且排序規模越大優勢越顯著。