1. 程式人生 > >使用分支/合併框架執行並行求和

使用分支/合併框架執行並行求和

分支/合併框架

​ 分支/合併框架的目的是以遞迴方式將可以並行的任務拆分成更小的任務,然後將每個子任
務的結果合併起來生成整體結果。它是ExecutorService介面的一個實現,它把子任務分配給
執行緒池(稱為ForkJoinPool)中的工作執行緒。首先來看看如何定義任務和子任務。

使用RecursiveTask

​ 要把任務提交到這個池,必須建立RecursiveTask 的一個子類,其中R是並行化任務(以 及所有子任務)產生的結果型別,或者如果任務不返回結果,則是RecursiveAction型別(當 然它可能會更新其他非區域性機構)。要定義RecursiveTask,只需實現它唯一的抽象方法 compute:

protected abstract R compute(); 

​ 這個方法同時定義了將任務拆分成子任務的邏輯,以及無法再拆分或不方便再拆分時,生成 單個子任務結果的邏輯。

​ 正由於此,這個方法的實現類似於下面的虛擬碼:

if (任務足夠小或不可分) { 順序計算該任務
} else {
     將任務分成兩個子任務
     遞迴呼叫本方法,拆分每個子任務,等待所有子任務完成
     合併每個子任務的結果
}

​ 一般來說並沒有確切的標準決定一個任務是否應該再拆分,但有幾種試探方法可以幫助你做
出這一決定,遞迴的任務拆分過程如圖下圖所示:

​ 你可能已經注意到,這只不過是著名的分治演算法的並行版本而已。這裡舉一個用分支/合併
框架的實際例子,讓我們試著用這個框架為一個數字範圍(這裡用一個
long[]陣列表示)求和。如前所述,你需要先為RecursiveTask類做一個實現,就是下面程式碼
ForkJoinSumCalculator。

用分支/合併框架執行並行求和

package java8.java8example;

import java.util.concurrent.RecursiveTask;

/**
 * @desctiption:
 * @author: yinghuaYang
 * @date: 2019/1/7
 */

public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    /**
     * 要求和的陣列
     */
    private final long[] numners;

    /**
     * 子任務處理的陣列的起始位置
     */
    private final int start;

    /**
     * 子任務處理的陣列的終止位置
     */
    private final int end;

    /**
     * 不再將任務分解為子任務的陣列大小
     */
    public static final long THRESHOLD = 10000;

    /**
     * 公共構造方法用於建立主任務
     * @param numners
     */
    public ForkJoinSumCalculator(long[] numners) {
        this(numners,0,numners.length);
    }

    /**
     * 私有構造方法用於以遞迴方式為主任務建立子任務
     * @param numners
     * @param start
     * @param end
     */
    private ForkJoinSumCalculator(long[] numners, int start, int end) {
        this.numners = numners;
        this.start = start;
        this.end = end;
    }

    /**
     * 覆蓋RecursiveTask抽象方法
     *
     * 該任務負責求和的步伐的大小
     * @return
     */
    @Override
    protected Long compute() {
        int length = end - start;

         /*如果大小小於或等於閾值,順序執行計算結果*/
        if (length <= THRESHOLD) {
            return computeSequentially();
        }

        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numners,start,start+length/2);
        leftTask.fork();

        /*建立一個任務為陣列的後一半求和*/
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numners,start+length/2,end);

        /*同步執行第二個子任務,有可能允許進一步遞迴劃分*/
        Long rightResult = rightTask.compute();

        /*讀取第一個子任務的結果,如果尚未完成就等待*/
        Long leftResult = leftTask.join();

        /*該任務的結果是兩個子任務結果的組合*/
        return leftResult + rightResult;
    }

    /**
     * 在子任務不再可分時及時結果的簡單寫法
     * @return
     */
    private long computeSequentially() {
        long sum = 0;

        for (int i=0;i<end;i++) {
            sum += numners[i];
        }

        return sum;
    }

}

​ 現在編寫一個方法來並行對前n個自然數求和就很簡單了。你只需把想要的數字陣列傳給
ForkJoinSumCalculator的建構函式:

package java8.java8example;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

/**
 * @desctiption:
 * @author: yinghuaYang
 * @date: 2019/1/7
 */

public class ForkJoinSumCalculatorMain {

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1,n).toArray();

        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);

        return new ForkJoinPool().invoke(task);
    }

    public static void main(String[] args) {
        long totalResult = forkJoinSum(10000L);
        System.out.println(totalResult);
    }
}

注意:

​ 上面用了一個LongStream來生成包含前n個自然數的陣列,然後建立一個ForkJoinTask (RecursiveTask的父類),並把陣列傳遞給程式碼所示ForkJoinSumCalculator的公共 建構函式。最後,你建立了一個新的ForkJoinPool,並把任務傳給它的呼叫方法 。

​ 在 ForkJoinPool中執行時,最後一個方法返回的值就是ForkJoinSumCalculator類定義的任務結果。 請注意在實際應用時,使用多個ForkJoinPool是沒有什麼意義的。正是出於這個原因,一般來說把它例項化一次,然後把例項儲存在靜態欄位中,使之成為單例,這樣就可以在軟體中任 何部分方便地重用了。這裡建立時用了其預設的無引數建構函式,這意味著想讓執行緒池使用JVM 能夠使用的所有處理器。更確切地說,該建構函式將使用Runtime.availableProcessors的 返回值來決定執行緒池使用的執行緒數。請注意availableProcessors方法雖然看起來是處理器, 但它實際上返回的是可用核心的數量,包括超執行緒生成的虛擬核心。

​ 執行ForkJoinSumCalculator,當把ForkJoinSumCalculator任務傳給ForkJoinPool時,這個任務就由池中的一個執行緒 執行,這個執行緒會呼叫任務的compute方法。該方法會檢查任務是否小到足以順序執行,如果不 夠小則會把要求和的陣列分成兩半,分給兩個新的ForkJoinSumCalculator,而它們也由 ForkJoinPool安排執行。

​ 因此,這一過程可以遞迴重複,把原任務分為更小的任務,直到滿足 不方便或不可能再進一步拆分的條件(本例中是求和的專案數小於等於10 000)。這時會順序計 算每個任務的結果,然後由分支過程建立的(隱含的)任務二叉樹遍歷回到它的根。接下來會合 並每個子任務的部分結果,從而得到總任務的結果。

這一過程如圖下圖所示:

我的新部落格地址:https://www.itaofly.com