1. 程式人生 > >併發程式設計之Fork/Join

併發程式設計之Fork/Join

併發與並行

併發:多個程序交替執行。

並行:多個程序同時進行,不存線上程的上下文切換。

併發與並行的目的都是使CPU的利用率達到最大。Fork/Join就是為了儘可能提高硬體的使用率而應運而生的。

計算密集型與IO密集型

計算密集型:也稱之為CPU密集型,此時系統的硬碟,記憶體效能相對於CPU要很多。系統在運作的時候CPU是處於100% loading的狀態,在系統完成磁碟的讀寫(I/O)以後,程式就會進行計算,在進行計算的時候CPU佔用率是很高的。計算密集型任務最大的特點就是進行大量的計算,消耗CPU資源,比如說高清解碼,計算圓周率啥的,都是靠CPU的運算能力。這種型別的任務雖然也支援多工,但是花費在任務切換的時間越多,執行效率就越低,要最高效的利用cpu,建議任務數小於核心執行緒數。程式碼執行效率也很關鍵,一般使用C語言來寫。執行緒數的設定:CPU核數+1(現代CPU支援超執行緒)。

IO密集型:CPU效能要比硬碟,記憶體效能好很多。這時候,大部分的情況是CPU在等I/O的讀寫操作,此時CPU loading並不是很高。I/O bound的程式一般在達到極限的時候,CPU利用率仍然比較低。對於IO密集型的任務主要涉及到網路,磁碟IO.特點是CPU消耗很少,任務的大部分時間都是在等待IO操作完成(磁碟IO的速度遠遠低於cpu與記憶體的速度)。對於這種任務,任務越多,CPU的效率越高。對於這種任務適合使用開發效率最高的指令碼語言,C語言基本上沒啥用。執行緒數的設定:(執行緒等待時間+執行緒CPU時間)/執行緒CPU時間)*CPU數目。

如何利用多核CPU,計算很大陣列中所有整數進行排序?

當資料量小的時候使用快速排序快,快速排序顯著的特徵是用遞迴的方法去排序的。當資料量大的時候歸遞排序。遞迴排序的思想就是在陣列中取一箇中間值,將一個數組分為2個,一個比中間值大,一個比中間值小,如此反覆拆分排序,直到最後無法再進行拆分,然後將結果合併。因此遞迴方法除了空間複雜度增加了,還可能會產生棧溢位。(程式計數器是唯一不會發生棧溢位的),虛擬機器棧預設最大空間是1M.   

分治思想:就是將一個規模大的問題劃分為規模較小的子問題,然後逐步解決小問題,最後合併子問題的解就得到了原問題的解。即分割原問題--求解子問題--合併子問題的解。

子問題一般都是相互獨立的,因此,通常通過遞迴呼叫演算法來求解子問題。

Fork/Join框架

  Fork/Join 是一個用於並行執行任務的框架,是一個把大任務拆分成小任務,執行小任務,最後彙總小任務的結果得到大任務的結果的框架。整體框架如下:

 

 

 

Fork/Join 特徵:

1、ForkJoinPool是ExecutorService的補充 ,適用於一些特定的場景,適合於計算密集型場景。如果存在I/O,執行緒間同步,sleep()等會造成執行緒長時間阻塞的情況,此時可以配合ManagedBlocker使用。

2、ForkJoinPool主要是實現分治法,分治之後遞迴呼叫函式。

 

ForkJoinPool 框架主要類

ForkJoinPool 實現ForkJoin的執行緒池 - ThreadPool

ForkJoinWorkerThread 實現ForkJoin的執行緒

ForkJoinTask<V> 一個描述ForkJoin的抽象類 Runnable/Callable

RecursiveAction 無返回結果的ForkJoinTask實現Runnable

RecursiveTask<V> 有返回結果的ForkJoinTask實現Callable

CountedCompleter<T> 在任務完成執行後會觸發執行一個自定義的鉤子函式

提交任務:

 

 

fork()類似於Thread.start(),但是它並不立即執行任務,而是將任務放入工作佇列中, 跟Thread.join()不同,ForkJoinTask的join()方法並不簡單的阻塞執行緒 利用工作執行緒執行其他任務, 當一個工作執行緒中中呼叫join(),它將處理其他任務,直到注意到目標子任務已經完成。

 ForkJoinPool中的所有工作執行緒都有一個自己的工作佇列WorkQueue,是一個雙端佇列Deque,從隊頭取任務,先進後出,執行緒私有,不共享。

如下圖所示:

 

 

執行緒竊取

工作竊取就是指某個執行緒從其他佇列裡竊取任務來執行。在ForkJoinPool中就是將一個大任務分成n個互不依賴的子任務,為了減少執行緒之間的競爭,於是把這些子任務放到不同的隊列當中去,併為每一個對列建立一個執行緒來執行佇列中的任務,A佇列的任務由A執行緒來執行。但是有的執行緒執行得比較快,很快就把自己隊列當中的任務執行完成了,但是A佇列裡還有待執行的任務,這時候這個執行緒(假設是B執行緒)就會去竊取他的隊列當中的任務來執行。為了減少竊取任務執行緒與被竊取任務執行緒之間的競爭,採用雙端佇列,竊取任務是從隊尾竊取,被竊取任務執行緒從隊頭獲取任務來執行。

為了儘可能的提高CPU的利用率,空閒的執行緒將從其他執行緒的佇列中竊取任務來執行,從workQueue的隊尾竊取任務,從而減少競爭,任務的竊取是遵從FIFO順序進行的,因為先放入的任務往往表示更大的工作量,竊取來的任務支援進一步的遞迴分解。

WorkQueue雙端佇列最小化任務“竊取”的競爭, push()/pop()僅在其所有者工作執行緒中呼叫 ,這些操作都是通過CAS來實現的,是Wait-free的 。

poll() 則由其他工作執行緒來呼叫“竊取”任務 可能不是wait-free。任務竊取的好處就是充分利用了資源,但是也有缺點,當隊列當中只有一個任務的時候,就會出現競爭,並且系統會耗費更多的資源,比如建立多個執行緒和多個雙端佇列。

 

 

 總結一下就是:

1. ForkJoinPool 的每個工作執行緒都維護著一個工作佇列(WorkQueue),這是一個雙端佇列(Deque),裡面存放的物件是任務(ForkJoinTask)。 2. 每個工作執行緒在執行中產生新的任務(通常是因為呼叫了 fork())時,會放入工作佇列的隊頭(左為隊尾,右側為隊頭),並且工作執行緒在處理自己的工作佇列時,使用的是 LIFO 方式,也就是說每次從隊頭取出任務來執行。(ForkJoinTask的fork()的子任務,將放入執行該任務的工作執行緒的隊頭,工作執行緒以LIFO的順序來處理佇列中的任務) 3. 每個工作執行緒在處理自己的工作佇列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其他工作執行緒的工作佇列),竊取的任務位於其他執行緒的工作佇列的隊尾,也就是說工作執行緒在竊取其他工作執行緒的任務時,使用的是FIFO 方式。 4. 在遇到 join() 時,如果需要 join 的任務尚未完成,則會先處理其他任務,並等待其完成。 5. 在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。

程式碼如下:

 public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }

為了測試ForkJoinPool的好處,我們來看以下兩段程式碼,來對比一下結果:

首先我們來看一下,就用自己寫的分任務執行,來計算

package com.test.executor.arrsum;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.test.executor.arrsum.utils.Utils;

public class SumRecursiveMT {
    public static class RecursiveSumTask implements Callable<Long> {
        public static final int SEQUENTIAL_CUTOFF = 1;
        int lo;
        int hi;
        int[] arr; // arguments
        ExecutorService executorService;

        RecursiveSumTask( ExecutorService executorService, int[] a, int l, int h) {
            this.executorService = executorService;
            this.arr = a;
            this.lo = l;
            this.hi = h;
        }

        public Long call() throws Exception { // override
            System.out.format("%s range [%d-%d] begin to compute %n",
                    Thread.currentThread().getName(), lo, hi);
            long result = 0;
            if (hi - lo <= SEQUENTIAL_CUTOFF) {
                for (int i = lo; i < hi; i++)
                    result += arr[i];

                System.out.format("%s range [%d-%d] begin to finished %n",
                        Thread.currentThread().getName(), lo, hi);
            }
            else {
                RecursiveSumTask left = new RecursiveSumTask(executorService, arr, lo, (hi + lo) / 2);
                RecursiveSumTask right = new RecursiveSumTask(executorService, arr, (hi + lo) / 2, hi);
                Future<Long> lr = executorService.submit(left);
                Future<Long> rr = executorService.submit(right);

                result = lr.get() + rr.get();
                System.out.format("%s range [%d-%d] finished to compute %n",
                        Thread.currentThread().getName(), lo, hi);
            }

            return result;
        }
    }


    public static long sum(int[] arr) throws Exception {
        int nofProcessors = Runtime.getRuntime().availableProcessors();
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        //ExecutorService executorService = Executors.newCachedThreadPool();

        RecursiveSumTask task = new RecursiveSumTask(executorService, arr, 0, arr.length);
        long result =  executorService.submit(task).get();
        return result;
    }

  //執行該方法,看看測試結果 public static void main(String[] args) throws Exception { int[] arr = Utils.buildRandomIntArray(20); System.out.printf("The array length is: %d\n", arr.length); long result = sum(arr); System.out.printf("The result is: %d\n", result); } } package com.test.executor.arrsum.utils; import java.util.Random; public class Utils { public static int[] buildRandomIntArray(int size) { int[] array = new int[size]; for (int i = 0; i < size; i++) { array[i] = new Random().nextInt(100); } return array; } public static int[] buildRandomIntArray() { int size = new Random().nextInt(100); int[] array = new int[size]; for (int i = 0; i < size; i++) { array[i] = new Random().nextInt(100); } return array; } public static void main(String[] args) { int[] ints = Utils.buildRandomIntArray(20); for (int i = 0; i < ints.length; i++) { System.out.println(ints[i]); } } }
package com.test.executor.arrsum;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.test.executor.arrsum.utils.Utils;

public class SumRecursiveMT {
    public static class RecursiveSumTask implements Callable<Long> {
        public static final int SEQUENTIAL_CUTOFF = 1;
        int lo;
        int hi;
        int[] arr; // arguments
        ExecutorService executorService;

        RecursiveSumTask( ExecutorService executorService, int[] a, int l, int h) {
            this.executorService = executorService;
            this.arr = a;
            this.lo = l;
            this.hi = h;
        }

        public Long call() throws Exception { // override
            System.out.format("%s range [%d-%d] begin to compute %n",
                    Thread.currentThread().getName(), lo, hi);
            long result = 0;
            if (hi - lo <= SEQUENTIAL_CUTOFF) {
                for (int i = lo; i < hi; i++)
                    result += arr[i];

                System.out.format("%s range [%d-%d] begin to finished %n",
                        Thread.currentThread().getName(), lo, hi);
            }
            else {
                RecursiveSumTask left = new RecursiveSumTask(executorService, arr, lo, (hi + lo) / 2);
                RecursiveSumTask right = new RecursiveSumTask(executorService, arr, (hi + lo) / 2, hi);
                Future<Long> lr = executorService.submit(left);
                Future<Long> rr = executorService.submit(right);

                result = lr.get() + rr.get();
                System.out.format("%s range [%d-%d] finished to compute %n",
                        Thread.currentThread().getName(), lo, hi);
            }

            return result;
        }
    }


    public static long sum(int[] arr) throws Exception {
        int nofProcessors = Runtime.getRuntime().availableProcessors();
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        //ExecutorService executorService = Executors.newCachedThreadPool();

        RecursiveSumTask task = new RecursiveSumTask(executorService, arr, 0, arr.length);
        long result =  executorService.submit(task).get();
        return result;
    }

    public static void main(String[] args) throws Exception {
        int[] arr = Utils.buildRandomIntArray(20);
        System.out.printf("The array length is: %d\n", arr.length);
        
        long result = sum(arr);

        System.out.printf("The result is: %d\n", result);

    }
}

 

執行該程式碼的結果如下:

 

 結果一直沒有出來,就說明一直在計算。因為執行緒在遞迴計算,開的執行緒太多,然後計算時間比較長。

ForkJoin的使用

ForkJoinTask:我們要使用ForkJoin框架,就要建立一個ForkJoin 任務,建立ForkJoin任務的話,不需要直接繼承ForkJoinTask類,而是繼承他的子類.ForkJoin框架有兩個子類RecursiveAction和RecursiveTask<V>。

  1、RecursiveAction:用於返回沒有結果的任務。(比如寫資料到磁碟以後就退出。一個RecursiveAction可以把工作分割成若干小塊,由獨立的執行緒或者CPU執行,通過繼承實現RecursiveAction)

  2、RecursiveTask<V> :用於執行有返回結果的任務。(將一個任務分割成若干的子任務,每個子任務返回的值合併到一個集體結果,可以水平的分割和合並。)

 ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。任務分割出來的子任務會新增到當前工作執行緒的雙端隊列當中,進入佇列的頭部。當一個工作執行緒的佇列中沒有任務的時候它會從其他佇列的尾部獲取任務來執行。

接下來來看看用ForkJoinPool來計算的程式碼:

package com.test.executor.arrsum;

import java.util.concurrent.RecursiveTask;

/**
 * RecursiveTask 平行計算,同步有返回值
 * ForkJoin框架處理的任務基本都能使用遞迴處理,比如求斐波那契數列等,但遞迴演算法的缺陷是:
 *    一隻會只用單執行緒處理,
 *    二是遞迴次數過多時會導致堆疊溢位;
 * ForkJoin解決了這兩個問題,使用多執行緒併發處理,充分利用計算資源來提高效率,同時避免堆疊溢位發生。
 * 當然像求斐波那契數列這種小問題直接使用線性演算法搞定可能更簡單,實際應用中完全沒必要使用ForkJoin框架,
 * 所以ForkJoin是核彈,是用來對付大傢伙的,比如超大陣列排序。
 * 最佳應用場景:多核、多記憶體、可以分割計算再合併的計算密集型任務
 */
class LongSum extends RecursiveTask<Long> {

    static final int SEQUENTIAL_THRESHOLD = 1000;
    static final long NPS = (1000L * 1000 * 1000);
    static final boolean extraWork = true; // change to add more than just a sum


    int low;
    int high;
    int[] array;

    LongSum(int[] arr, int lo, int hi) {
        array = arr;
        low = lo;
        high = hi;
    }

    /**
     * fork()方法:將任務放入佇列並安排非同步執行,一個任務應該只調用一次fork()函式,除非已經執行完畢並重新初始化。
     * tryUnfork()方法:嘗試把任務從佇列中拿出單獨處理,但不一定成功。
     * join()方法:等待計算完成並返回計算結果。
     * isCompletedAbnormally()方法:用於判斷任務計算是否發生異常。
     */
    protected Long compute() {

        if (high - low <= SEQUENTIAL_THRESHOLD) {
            long sum = 0;
            for (int i = low; i < high; ++i) {
                sum += array[i];
            }
            return sum;

        } else {
            int mid = low + (high - low) / 2;
            LongSum left = new LongSum(array, low, mid);
            LongSum right = new LongSum(array, mid, high);
            left.fork();
            right.fork();
            long rightAns = right.join();
            long leftAns = left.join();
            return leftAns + rightAns;
        }
    }
}

       package com.test.executor.arrsum;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

import com.test.executor.arrsum.utils.Utils;

public class LongSumMain {

    //獲取邏輯處理器數量
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    /** for time conversion */
    static final long NPS = (1000L * 1000 * 1000);

    static long calcSum;

    static final boolean reportSteals = true;

    public static void main(String[] args) throws Exception {
        int[] array = Utils.buildRandomIntArray(20000000);
        System.out.println("cpu-num:"+NCPU);
        //單執行緒下計算陣列資料總和
         calcSum = seqSum(array);
        System.out.println("seq sum=" + calcSum);

        //採用fork/join方式將陣列求和任務進行拆分執行,最後合併結果
        LongSum ls = new LongSum(array, 0, array.length);
          ForkJoinPool fjp  = new ForkJoinPool(NCPU); //使用的執行緒數
        ForkJoinTask<Long> task = fjp.submit(ls);
        System.out.println("forkjoin sum=" + task.get());

        if(task.isCompletedAbnormally()){
            System.out.println(task.getException());
        }

        fjp.shutdown();

    }


    static long seqSum(int[] array) {
        long sum = 0;
        for (int i = 0; i < array.length; ++i)
            sum += array[i];
        return sum;
    }

}

以上的執行結果就很快:

cpu-num:4
seq sum=989877234
forkjoin sum=989877234

 Fork/Join框架原理

異常處理

   ForkJoinTask在執行任務的時候可能會拋異常,此時我們沒有辦法從主執行緒裡面獲取異常,所以我們使用以下幾種方法來判斷以及獲取異常:

  1、isCompletedAbnormally()方法來判斷任務有沒有丟擲異常或者被取消。

  2、getException()可以獲取到異常。

  3、isCompletedNormally()這個方法是看任務是否正常執行完成且沒有任何異常。

  示例:

if(task.isCompletedAbnormally())
   System.out.print(task.getException());

ForkJoinPool構造方法

 public ForkJoinPool() {
        this(Runtime.getRuntime().availableProcessors(),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
 public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        Thread.UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        checkPermission();
        if (factory == null)
            throw new NullPointerException();
        if (parallelism <= 0 || parallelism > MAX_ID)
            throw new IllegalArgumentException();
        this.parallelism = parallelism;
        this.factory = factory;
        this.ueh = handler;
        this.locallyFifo = asyncMode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
        this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
        // initialize workers array with room for 2*parallelism if possible
        int n = parallelism << 1;
        if (n >= MAX_ID)
            n = MAX_ID;
        else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
            n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
        }
        workers = new ForkJoinWorkerThread[n + 1];
        this.submissionLock = new ReentrantLock();
        this.termination = submissionLock.newCondition();
        StringBuilder sb = new StringBuilder("ForkJoinPool-");
        sb.append(poolNumberGenerator.incrementAndGet());
        sb.append("-worker-");
        this.workerNamePrefix = sb.toString();
    }

重要引數說明:

1、parallelism:並行數。一般跟CPU個數保持一致。通過Runtime.getRuntime().availableProcessors()可以獲取到當前機器的CPU個數。

2、ForkJoinWorkerThreadFactory factory:建立執行緒的工廠

3、Handler  :執行緒異常處理器,Thread.UncaughtExceptionHandler ,該處理器線上程執行任務時由於某些無法預料到的錯誤而導致任務執行緒中斷時進行一些處理,預設情況為null。

 4、boolean asyncMode: 表示工作執行緒內的任務佇列是採用何種方式進行排程,可以是先進先出FIFO,也可以是先進後出FILO.如果為true,則表示執行緒池中的執行緒使用先進先出的方式進行排程,預設為false.

ForkJoinTask fork()/join()方法

1、fork():這個方法的作用就是將任務放到當前執行緒的工作隊列當中去;

public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

2、join()的方法我們先看一下程式碼:

 private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
                return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }

  */
    private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        return getRawResult();
    }
public final V join() {
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }

1、檢查呼叫Join()方法的執行緒是否是ForkJoinWorkerThread,如果不是的話就阻塞當前執行緒,等待任務完成,如果是則不阻塞;

2、判斷任務的狀態,是否已經完成,如果已經完成,則返回結果;

3、任務沒有完成,判斷任務是否處於自己的隊列當中,如果是,就取出執行完任務;

4、任務沒有在自己隊列當中,則說明任務被偷走,找到偷走任務的小偷,竊取小偷工作佇列中的任務,並執行,幫助小偷快點完成待join的任務;

5、若小偷偷走的任務已經完成,則找到小偷的小偷,幫助他完成任務;

6、遞迴執行5;

總體歸納起來的流程如下:

 

 

 ForkJoinPool 之submit()方法

  public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        forkOrSubmit(task);
        return task;
    }
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&
            (w = (ForkJoinWorkerThread)t).pool == this)
            w.pushTask(task);
        else
            addSubmission(task);
    }

ForkJoinPool有自己的工作佇列,這些工作對列是用來接收由外部執行緒(非ForkJoinThread)提交過來的任務,這個對列稱為submittingQueue。submit()和fork()沒有本質的區別,只是提交物件是submittingQueue.submittingQueue也是工作執行緒竊取物件,當其中的任務被工作執行緒竊取成功的時候,代表提交任務正式進入執行階段。

 

Fork/Join框架執行流程

&n