使用分支/合併框架執行並行求和
分支/合併框架
分支/合併框架的目的是以遞迴方式將可以並行的任務拆分成更小的任務,然後將每個子任
務的結果合併起來生成整體結果。它是ExecutorService介面的一個實現,它把子任務分配給
執行緒池(稱為ForkJoinPool)中的工作執行緒。首先來看看如何定義任務和子任務。
使用RecursiveTask
要把任務提交到這個池,必須建立RecursiveTask
protected abstract R compute();
這個方法同時定義了將任務拆分成子任務的邏輯,以及無法再拆分或不方便再拆分時,生成 單個子任務結果的邏輯。
正由於此,這個方法的實現類似於下面的虛擬碼:
if (任務足夠小或不可分) { 順序計算該任務
} else {
將任務分成兩個子任務
遞迴呼叫本方法,拆分每個子任務,等待所有子任務完成
合併每個子任務的結果
}
一般來說並沒有確切的標準決定一個任務是否應該再拆分,但有幾種試探方法可以幫助你做
出這一決定,遞迴的任務拆分過程如圖下圖所示:
你可能已經注意到,這只不過是著名的分治演算法的並行版本而已。這裡舉一個用分支/合併
框架的實際例子,讓我們試著用這個框架為一個數字範圍(這裡用一個
long[]陣列表示)求和。如前所述,你需要先為RecursiveTask類做一個實現,就是下面程式碼
ForkJoinSumCalculator。
用分支/合併框架執行並行求和
package java8.java8example; import java.util.concurrent.RecursiveTask; /** * @desctiption: * @author: yinghuaYang * @date: 2019/1/7 */ public class ForkJoinSumCalculator extends RecursiveTask<Long> { /** * 要求和的陣列 */ private final long[] numners; /** * 子任務處理的陣列的起始位置 */ private final int start; /** * 子任務處理的陣列的終止位置 */ private final int end; /** * 不再將任務分解為子任務的陣列大小 */ public static final long THRESHOLD = 10000; /** * 公共構造方法用於建立主任務 * @param numners */ public ForkJoinSumCalculator(long[] numners) { this(numners,0,numners.length); } /** * 私有構造方法用於以遞迴方式為主任務建立子任務 * @param numners * @param start * @param end */ private ForkJoinSumCalculator(long[] numners, int start, int end) { this.numners = numners; this.start = start; this.end = end; } /** * 覆蓋RecursiveTask抽象方法 * * 該任務負責求和的步伐的大小 * @return */ @Override protected Long compute() { int length = end - start; /*如果大小小於或等於閾值,順序執行計算結果*/ if (length <= THRESHOLD) { return computeSequentially(); } ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numners,start,start+length/2); leftTask.fork(); /*建立一個任務為陣列的後一半求和*/ ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numners,start+length/2,end); /*同步執行第二個子任務,有可能允許進一步遞迴劃分*/ Long rightResult = rightTask.compute(); /*讀取第一個子任務的結果,如果尚未完成就等待*/ Long leftResult = leftTask.join(); /*該任務的結果是兩個子任務結果的組合*/ return leftResult + rightResult; } /** * 在子任務不再可分時及時結果的簡單寫法 * @return */ private long computeSequentially() { long sum = 0; for (int i=0;i<end;i++) { sum += numners[i]; } return sum; } }
現在編寫一個方法來並行對前n個自然數求和就很簡單了。你只需把想要的數字陣列傳給
ForkJoinSumCalculator的建構函式:
package java8.java8example;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
/**
* @desctiption:
* @author: yinghuaYang
* @date: 2019/1/7
*/
public class ForkJoinSumCalculatorMain {
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1,n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
public static void main(String[] args) {
long totalResult = forkJoinSum(10000L);
System.out.println(totalResult);
}
}
注意:
上面用了一個LongStream來生成包含前n個自然數的陣列,然後建立一個ForkJoinTask (RecursiveTask的父類),並把陣列傳遞給程式碼所示ForkJoinSumCalculator的公共 建構函式。最後,你建立了一個新的ForkJoinPool,並把任務傳給它的呼叫方法 。
在 ForkJoinPool中執行時,最後一個方法返回的值就是ForkJoinSumCalculator類定義的任務結果。 請注意在實際應用時,使用多個ForkJoinPool是沒有什麼意義的。正是出於這個原因,一般來說把它例項化一次,然後把例項儲存在靜態欄位中,使之成為單例,這樣就可以在軟體中任 何部分方便地重用了。這裡建立時用了其預設的無引數建構函式,這意味著想讓執行緒池使用JVM 能夠使用的所有處理器。更確切地說,該建構函式將使用Runtime.availableProcessors的 返回值來決定執行緒池使用的執行緒數。請注意availableProcessors方法雖然看起來是處理器, 但它實際上返回的是可用核心的數量,包括超執行緒生成的虛擬核心。
執行ForkJoinSumCalculator,當把ForkJoinSumCalculator任務傳給ForkJoinPool時,這個任務就由池中的一個執行緒 執行,這個執行緒會呼叫任務的compute方法。該方法會檢查任務是否小到足以順序執行,如果不 夠小則會把要求和的陣列分成兩半,分給兩個新的ForkJoinSumCalculator,而它們也由 ForkJoinPool安排執行。
因此,這一過程可以遞迴重複,把原任務分為更小的任務,直到滿足 不方便或不可能再進一步拆分的條件(本例中是求和的專案數小於等於10 000)。這時會順序計 算每個任務的結果,然後由分支過程建立的(隱含的)任務二叉樹遍歷回到它的根。接下來會合 並每個子任務的部分結果,從而得到總任務的結果。
這一過程如圖下圖所示:
我的新部落格地址:https://www.itaofly.com