併發程式設計之:ForkJoin
大家好,我是小黑,一個在網際網路苟且偷生的農民工。
在JDK1.7中引入了一種新的Fork/Join執行緒池,它可以將一個大的任務拆分成多個小的任務並行執行並彙總執行結果。
Fork/Join採用的是分而治之的基本思想,分而治之就是將一個複雜的任務,按照規定的閾值劃分成多個簡單的小任務,然後將這些小任務的結果再進行彙總返回,得到最終的任務。
分治法
分治法是計算機領域常用的演算法中的其中一個,主要思想就是將將一個規模為N的問題,分解成K個規模較小的子問題,這些子問題相互獨立且與原問題性質相同;求解出子問題的解,合併得到原問題的解。
解決問題的思路
- 分割原問題;
- 求解子問題;
- 合併子問題的解為原問題的解。
使用場景
二分查詢,階乘計算,歸併排序,堆排序、快速排序、傅立葉變換都用了分治法的思想。
ForkJoin並行處理框架
在JDK1.7中推出的ForkJoinPool執行緒池,主要用於ForkJoinTask
任務的執行,ForkJoinTask
是一個類似執行緒的實體,但是比普通執行緒更輕量。
我們來使用ForkJoin框架完成以下1-10億求和的程式碼。
public class ForkJoinMain { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> rootTask = forkJoinPool.submit(new SumForkJoinTask(1L, 10_0000_0000L)); System.out.println("計算結果:" + rootTask.get()); } } class SumForkJoinTask extends RecursiveTask<Long> { private final Long min; private final Long max; private Long threshold = 1000L; public SumForkJoinTask(Long min, Long max) { this.min = min; this.max = max; } @Override protected Long compute() { // 小於閾值時直接計算 if ((max - min) <= threshold) { long sum = 0; for (long i = min; i < max; i++) { sum = sum + i; } return sum; } // 拆分成小任務 long middle = (max + min) >>> 1; SumForkJoinTask leftTask = new SumForkJoinTask(min, middle); leftTask.fork(); SumForkJoinTask rightTask = new SumForkJoinTask(middle, max); rightTask.fork(); // 彙總結果 return leftTask.join() + rightTask.join(); } }
上述程式碼邏輯可通過下圖更加直觀的理解。
ForkJoin框架實現
在ForkJoin框架中重要的一些介面和類如下圖所示。
ForkJoinPool
ForkJoinPool
是用於執行ForkJoinTasks
的執行緒池,實現了Executor
介面。
可以通過new ForkJoinPool()
直接建立ForkJoinPool
物件。
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode){ this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
通過檢視構造方法原始碼我們可以發現,在建立ForkJoinPool
時,有以下4個引數:
- parallelism:期望併發數。預設會使用
Runtime.getRuntime().availableProcessors()
的值 - factory:建立
ForkJoin
工作執行緒的工廠,預設為defaultForkJoinWorkerThreadFactory
- handler:執行任務時遇到不可恢復的錯誤時的處理程式,預設為
null
- asyncMode:工作執行緒獲取任務使用FIFO模式還是LIFO模式,預設為LIFO
ForkJoinTask
ForkJoinTask
是一個對於在ForkJoinPool
中執行任務的抽象類定義。
可以通過少量的執行緒處理大量任務和子任務,ForkJoinTask
實現了Future
介面。主要通過fork()
方法安排非同步任務執行,通過join()
方法等待任務執行的結果。
想要使用ForkJoinTask
通過少量的執行緒處理大量任務,需要接受一些限制。
- 拆分的任務中避免同步方法或同步程式碼塊;
- 在細分的任務中避免執行阻塞I/O操作,理想情況下基於完全獨立於其他正在執行的任務訪問的變數;
- 不允許在細分任務中丟擲受檢異常。
因為ForkJoinTask
是抽象類不能被例項化,所以在使用時JDK為我們提供了三種特定型別的ForkJoinTask父類供我們自定義時繼承使用。
- RecursiveAction:子任務不返回結果
- RecursiveTask:子任務返回結果
- CountedCompleter:在任務完成執行後會觸發執行
ForkJoinWorkerThread
ForkJoinPool
中用於執行ForkJoinTask
的執行緒。
ForkJoinPool
既然實現了Executor
介面,那麼它和我們常用的ThreadPoolExecutor
之前又有什麼差異呢?
如果們使用ThreadPoolExecutor
來完成分治法的邏輯,那麼每個子任務都需要建立一個執行緒,當子任務的數量很大的情況下,可能會達到上萬個,那麼使用ThreadPoolExecutor
創建出上萬個執行緒,這顯然是不可行、不合理的;
而ForkJoinPool
在處理任務時,並不會按照任務開啟執行緒,只會按照指定的期望並行數量建立執行緒。在每個執行緒工作時,如果需要繼續拆分子任務,則會將當前任務放入ForkJoinWorkerThread
的任務佇列中,遞迴處理直到最外層的任務。
工作竊取演算法
ForkJoinPool
的各個工作執行緒都會維護一個各自的任務佇列,減少執行緒之間對於任務的競爭;
每個執行緒都會先保證將自己佇列中的任務執行完,當自己的任務執行完之後,會去看其他執行緒的任務佇列中是否有未處理完的任務,如果有則會幫助其他執行緒執行;
為了減少在幫助其他執行緒執行任務時發生競爭,會使用雙端佇列來存放任務,被竊取的任務只會從佇列的頭部獲取任務,而正常處理的執行緒每次都是從佇列的尾部獲取任務。
優點
充分利用了執行緒資源,避免資源的浪費,並且減少了執行緒間的競爭。
缺點
需要給每個執行緒開闢一個佇列空間;在工作佇列中只有一個任務時同樣會存線上程競爭。
最後
如果覺得文章對你有點幫助,不妨掃碼點個關注。我是小黑,下期見~