1. 程式人生 > >Java8之Stream(2)

Java8之Stream(2)

三、Fork-Join機制介紹

Stream之所以可以使用並行運算,之所以能夠最大程度的使用CPU資源,是因為他的Fork-Join程式設計模型,在本章中我們介紹一下Fork-Join的使用,希望讀者能掌握在Java中如何使用Fork-Join這一程式設計模型,本章內容大致如下

v Fork-Join簡單使用

v 快速排序演算法

v Fork-Join並行快速排序

3.1 Fork-Join簡單實用

在開始說Fork-Join的API以及原理之前,我們先來看一個非常簡單的例子,讓讀者能夠快速上手,我們在本章中我們實現一個非常簡單的演算法,就是計算自然數的累加結果,從start到end,然後將結果輸出出來即可,這個簡單的再也不能簡單了。

我們用傳統方法實現一個,大家看一下

    @Test

    public void testTradition()

    {

        long startTime = System.currentTimeMillis();

        int start = 0;

        int end = 1500000;

        int sum = 0;

        for(int i = start;i<end;i++)

        {

            sum+=i;

        }

        System.out.println("Tradition Result:"+sum+",speed time:"+(System.currentTimeMillis()-startTime));

    }

好了,不想多說了,我們看一下,如何使用Fork-Join來實現這樣的需求

package com.wangwenjun.forkjoin;

import java.util.concurrent.RecursiveTask;

/**

 * Created by wangwenjun on 2015/8/8.

 */

public class ConcurrencyCalculator

        extends RecursiveTask<Integer> {

    private final int start;

    private final int end;

    private final static int THRESHOLD = 5;

    public ConcurrencyCalculator(int start, int end) {

        this.start = start;

        this.end = end;

    }

    @Override

    protected Integer compute() {

        int result = 0;

        if ((end -start) < THRESHOLD) {

            for (int x = start; x < end; x++) {

                result += x;

            }

        } else {

            int middle = (start + end) / 2;

            ConcurrencyCalculator leftCalculator = new ConcurrencyCalculator(start, middle);

            ConcurrencyCalculator rightCalculator = new ConcurrencyCalculator(middle, end);

            leftCalculator.fork();

            rightCalculator.fork();

            result += (leftCalculator.join() + rightCalculator.join());

        }

        return result;

    }

}

如果你對上面的程式碼理解上有些困難,別擔心,慢慢的介紹了Fork-Join和API的使用之後,你就會明白如何使用了。

3.2 Fork-Join原理以及API介紹

3.2.1 Fork-Join原理介紹

我還是不太喜歡抄襲太多別人的東西來說他的基本原理,我們來舉個簡單的例子吧,假設你要到資料庫中查詢一百萬條資料,資料查詢結束之後,如要對每條資料進行一些額外的操作,操作有可能很簡單,有可能很複雜,如果你用單執行緒的方式處理勢必很慢,如果你用多執行緒的方式處理,你需要對這100萬條資料進行拆分,那些資料交給那個執行緒去操作,等所有的執行緒操作結束之後然後在集中將結果合併,說白了就是採用分而治之的辦法,Fork-Join模型一言蔽之就是採用分而治之的方式,將一個任務分解成若干個子問題,子問題還可以繼續拆分,如下圖所示:

 

Doug Lea大神開發的Fork/Join平行計算框架,整合到了JDK 7中,很優雅的讓你遠離了自己進行加鎖,釋放鎖,隱藏了工作執行緒,執行緒的排程等,所以你才能看到上面例子中用了很少的程式碼就可以模擬一個並行運算的例子。

3.2.2 Fork-Join API的介紹

Fork-Join 主要的API大致如下所示

 

如果你計算的需要返回值就繼承RecursiveTask抽線類,如果不需要返回值就繼承RecursiveAction抽象類。

看到這裡關於API的介紹和基本原理的介紹,你再回頭看看上面的例子,相信就不難理解了吧,在接下來我們再來實現一個比較複雜的快速排序採用並行的方式進行,加深理解。

3.3 快速排序演算法

快速排序演算法是冒泡演算法的改進版本,正如他的名字一樣,排序速度是相當的快,快速排序由C. A. R. Hoare在1962年提出。它的基本思想是:通過一趟排序將要排序的資料分割成獨立的兩部分,其中一部分的所有資料都比另外一部分的所有資料都要小,然後再按此方法對這兩部分資料分別進行快速排序,整個排序過程可以遞迴進行,以此達到整個資料變成有序序列。

不想浪費太多的筆墨介紹快速排序演算法的原理,相信很多人,尤其是IT行業的從業者,對於寫出快速排序演算法這樣的基本功都是信手拈來的事情,我在這裡只是將我的實現程式碼粘貼出來。

package com.wangwenjun.forkjoin;

import java.util.Random;

/**

 * Created by wangwenjun on 2015/8/9.

 */

public class QuickSort {

    private final int LEN = 10;

    public int[] prepareForData() {

        Random random = new Random(System.currentTimeMillis());

        int[] originalArray = new int[LEN];

        for (int i = 0; i < LEN; i++) {

            originalArray[i] = random.nextInt(20);

        }

        return originalArray;

    }

    public void print(String literal, int[] array) {

        System.out.println(literal);

        for (int element : array) {

            System.out.print(element + " ");

        }

        System.out.println();

    }

    public void quickSort(int[] originalArray) {

        if (originalArray.length > 0) {

            sort(originalArray, 0, originalArray.length - 1);

        }

    }

    private void sort(int[] originalArray, int low, int high) {

        if (low < high) {

            int middle = partition(originalArray, low, high);

            sort(originalArray, low, middle - 1);

            sort(originalArray, middle + 1, high);

        }

    }

    private int partition(int[] array, int low, int high) {

        int tmp = array[low];

        while (low < high) {

            while (low < high && array[high] >= tmp) {

                high--;

            }

            array[low] = array[high];

            while (low < high && array[low] <= tmp) {

                low++;

            }

            array[high] = array[low];

        }

        array[low] = tmp;

        return low;

    }

}

寫個測試程式碼,測試一下,程式碼如下所示:

package com.wangwenjun.forkjoin;

import org.junit.Test;

/**

 * Created by wangwenjun on 2015/8/9.

 */

public class QuickSortTest {

    /**

     * original literal

     */

    private final static String ORIGINAL_LITERAL = "====The original array as below====";

    /**

     * the array after sort.

     */

    private final static String SORTED_LITERAL = "====The array after sorted as below====";

    @Test

    public void testQuickSort() {

        QuickSort quickSort = new QuickSort();

        int[] originalArray = quickSort.prepareForData();

        quickSort.print(ORIGINAL_LITERAL, originalArray);

        quickSort.quickSort(originalArray);

        quickSort.print(SORTED_LITERAL, originalArray);

    }

}

我運行了一次,大致結果如下所示

====The original array as below====

12 18 12 18 2 1 7 9 14 13 

====The array after sorted as below====

1 2 7 9 12 12 13 14 18 18 

3.4 使用Fork-Join實現並行快速排序演算法

我們在採用Fork-Join的方式來實現一下,看看如何使用並行的方式提高計算能力,提升效能。

程式碼如下所示:

package com.wangwenjun.forkjoin;

import java.util.List;

import java.util.concurrent.RecursiveAction;

/**

 * Created by wangwenjun on 2015/8/8.

 */

public class ForkJoinQuickSort<T extends Comparable> extends RecursiveAction {

    private List<T> data;

    private int left;

    private int right;

    public ForkJoinQuickSort(List<T> data) {

        this.data = data;

        this.left = 0;

        this.right = data.size() - 1;

    }

    public ForkJoinQuickSort(List<T> data, int left, int right) {

        this.data = data;

        this.left = left;

        this.right = right;

    }

    @Override

    protected void compute() {

        if (left < right) {

            int pivotIndex = left + ((right - left) / 2);

            pivotIndex = partition(pivotIndex);

            invokeAll(new ForkJoinQuickSort(data, left, pivotIndex - 1),

                    new ForkJoinQuickSort(data, pivotIndex + 1, right));

        }

    }

    private int partition(int pivotIndex) {

        T pivotValue = data.get(pivotIndex);

        swap(pivotIndex, right);

        int storeIndex = left;

        for (int i = left; i < right; i++) {

            if (data.get(i).compareTo(pivotValue) < 0) {

                swap(i, storeIndex);

                storeIndex++;

            }

        }

        swap(storeIndex, right);

        return storeIndex;

    }

    private void swap(int i, int j) {

        if (i != j) {

            T iValue = data.get(i);

            data.set(i, data.get(j));

            data.set(j, iValue);

        }

    }

}

測試程式碼如下所示:

package com.wangwenjun.forkjoin;

import org.junit.Test;

import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import java.util.concurrent.ForkJoinPool;

/**

 * Created by wangwenjun on 2015/8/9.

 */

public class ForkJoinQuickSortTest {

    private static final int LEN = 10;

    private List<Integer> prepareForData() {

        Random random = new Random(System.currentTimeMillis());

        List<Integer> data = new ArrayList<>();

        for (int i = 0; i < LEN; i++) {

            data.add(random.nextInt(20));

        }

        return data;

    }

    private void print(String literal, List<Integer> data) {

        System.out.println(literal);

        for (int element : data) {

            System.out.print(element + " ");

        }

        System.out.println();

    }

    @Test

    public void test() {

        List<Integer> data = prepareForData();

        print("####before", data);

        ForkJoinQuickSort<Integer> quickSort = new ForkJoinQuickSort<>(data);

        ForkJoinPool pool = new ForkJoinPool();

        pool.invoke(quickSort);

        print("####after", data);

    }

}

執行結果如下所示:

####before

19 4 9 2 9 14 18 5 6 18 

####after

2 4 5 6 9 9 14 18 18 19 

3.5 Fork-Join總結

Fork-Join程式設計模型出來的時間其實已經不算晚了,在Java 1.7版本中才被引入,做Unix C++開發的人早都掌握該項技能了,好飯不怕晚,在我們平時的工作中他還有很多的應用場景,比如你的任務很適合進行拆分,並且比較容易進行合併,提高程式的執行速度,但是我個人建議不能將獲取資源的地方使用Fork,比如你要去網路讀資料或者從資料庫中讀取資料,分開多個任務會導致網路以及資料庫的壓力,將處理過程Fork是一個不錯的選擇,獲取資料除非特別需要,否則不要使用Fork增加並行,對資源提供者也會是一個不小的壓力。