Oracle官方教程之Fork/Join
fork/join框架是ExecutorService
介面的一種具體實現,目的是為了幫助你更好地利用多處理器帶來的好處。它是為那些能夠被遞迴地拆解成子任務的工作型別量身設計的。其目的在於能夠使用所有可用的運算能力來提升你的應用的效能。
類似於ExecutorService
介面的其他實現,fork/join框架會將任務分發給執行緒池中的工作執行緒。fork/join框架的獨特之處在與它使用工作竊取(work-stealing)演算法。完成自己的工作而處於空閒的工作執行緒能夠從其他仍然處於忙碌(busy)狀態的工作執行緒處竊取等待執行的任務。
fork/join框架的核心是ForkJoinPool
類,它是對AbstractExecutorService
類的擴充套件。ForkJoinPool
實現了工作偷取演算法,並可以執行ForkJoinTask
任務。
基本使用方法
使用fork/join框架的第一步是編寫執行一部分工作的程式碼。你的程式碼結構看起來應該與下面所示的虛擬碼類似:
if (當前這個任務工作量足夠小) 直接完成這個任務 else 將這個任務或這部分工作分解成兩個部分 分別觸發(invoke)這兩個子任務的執行,並等待結果
你需要將這段程式碼包裹在一個ForkJoinTask
的子類中。不過,通常情況下會使用一種更為具體的的型別,或者是RecursiveTask
(會返回一個結果),或者是RecursiveAction
。
當你的ForkJoinTask
子類準備好了,建立一個代表所有需要完成工作的物件,然後將其作為引數傳遞給一個ForkJoinPool
例項的invoke()
方法即可。
要清晰,先模糊
想要了解fork/join框架的基本工作原理,接下來的這個例子會有所幫助。假設你想要模糊一張圖片。原始的source圖片由一個整數的陣列表示,每個整數表示一個畫素點的顏色數值。與source圖片相同,模糊之後的destination圖片也由一個整數陣列表示。
對圖片的模糊操作是通過對source陣列中的每一個畫素點進行處理完成的。處理的過程是這樣的:將每個畫素點的色值取出,與周圍畫素的色值(紅、黃、藍三個組成部分)放在一起取平均值,得到的結果被放入destination
public class ForkBlur extends RecursiveAction { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; // Processing window size; should be odd. private int mBlurWidth = 15; public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // Calculate average. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float)((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float)((pixel & 0x000000ff) >> 0) / mBlurWidth; } // Reassemble destination pixel. int dpixel = (0xff000000 ) | (((int)rt) << 16) | (((int)gt) << 8) | (((int)bt) << 0); mDestination[index] = dpixel; } }
接下來你需要實現父類中的compute()
方法,它會直接執行模糊處理,或者將當前的工作拆分成兩個更小的任務。陣列的長度可以作為一個簡單的閥值來判斷任務是應該直接完成還是應該被拆分。
protected static int sThreshold = 100000; protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); }
如果前面這個方法是在一個RecursiveAction
的子類中,那麼設定任務在ForkJoinPool
中執行就再直觀不過了。通常會包含以下一些步驟:
- 建立一個表示所有需要完成工作的任務。
// source image pixels are in src // destination image pixels are in dst ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
- 建立將要用來執行任務的
ForkJoinPool
。ForkJoinPool pool = new ForkJoinPool();
- 執行任務。
pool.invoke(fb);
想要瀏覽完成的原始碼,請檢視ForkBlur
,其中還包含一些建立destination圖片檔案的額外程式碼。
標準實現
除了能夠使用fork/join框架來實現能夠在多處理系統中被並行執行的定製化演算法(如前文中的ForkBlur.java例子),在Java SE中一些比較常用的功能點也已經使用fork/join框架來實現了。在Java SE 8中,java.util.Arrays
類的一系列parallelSort()
方法就使用了fork/join來實現。這些方法與sort()
系列方法很類似,但是通過使用fork/join框架,藉助了併發來完成相關工作。在多處理器系統中,對大陣列的並行排序會比序列排序更快。這些方法究竟是如何運用fork/join框架並不在本教程的討論範圍內。想要了解更多的資訊,請參見Java API文件。
其他採用了fork/join框架的方法還包括java.util.streams
包中的一些方法,此包是作為Java SE 8發行版中Project Lambda
的一部分。想要了解更多資訊,請參見Lambda Expressions
一節。