1. 程式人生 > >JUC執行緒框架深度解析 — 08、ForkJoinTask架構

JUC執行緒框架深度解析 — 08、ForkJoinTask架構

【 ForkJoinTask分支任務 】
       分之合併的設計思想主要是來自於CPU執行環境,ForkJoinTask是在JDK1.7之後追加到java之中的一個類庫,

其主要的功能是進行資源竊取功能的實現。


❥ 但需要注意的是,在分支合併任務的處理結構中一定要注意以下的幾點:
   • 分支任務之中的資料的同步處理一定要有分支任務自己來完成,不要求進行額外的控制;
   • 在進行分支處理操作的時候不要進行IO操作;
   • 由於分支任務是捆綁一起執行的,如果出現了異常千萬別丟擲,會整體任務失敗。
❥ 在進行分支任務的處理之中主要使用如下的幾個核心類:
   • ForkJoinTask : 進行分支合併任務的處理類;
   • ForkJoinPool : 分支合併池。

【 分支任務類結構 】


【 有返回結果的任務:RecursiveTask 】
 
 下面以一個簡單的1—100累加來實現一個分支的處理任務,如果要進行1—100的累加,
可以將任務分為兩個階段:1—50累加,第二個是進行51—100的累加。

範例:實現分支處理

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class TestDemo {
    public static void main(String[] args) throws Exception {
        // 從0-100的累加處理操作
        AddTask task = new AddTask(0, 100);
        ForkJoinPool pool = new ForkJoinPool() ;
        // 提交任務
        Future<Integer> future = pool.submit(task);
        System.out.println(future.get());
    }
}
@SuppressWarnings("serial")  // 相當於繼承了ForkJoinTask父類
class AddTask extends RecursiveTask<Integer> {
    private int start;
    private int end;
    // 傳入計算的開始和結束的值
    public AddTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override   // 是進行資料的分支處理操作
    protected Integer compute() {
        // 儲存求和的計算結果
        int sum = 0;
        if (end - start < 100) { // 開啟了分支
            for (int x = start; x <= end; x++) {
                sum += x;
            }
        } else {
            int middle = (start + end) / 2; // 中間值
            // 做0 - 50累加
            AddTask leftTask = new AddTask(start, middle);
            // 做51 - 100累加
            AddTask rightTask = new AddTask(middle + 1, end);
            // 表示開啟下一個分支計算,開啟的是computer()
            leftTask.fork();
            // 表示開啟下一個分支計算,開啟的是computer()
            rightTask.fork();
            // 把兩個的分支結果進行合併
            sum = leftTask.join() + rightTask.join();
            return sum;
        }
        return sum;
    }
}   // 輸出結果:5050
     整個的分支合併之中相當於就是一個執行緒池的擴充套件概念,在整體的計算之中,每一個分支都會產生一個新的執行緒物件進行計算,唯一的區別是,預設情況下的執行緒池是由使用者自己來設定的執行緒物件,而ForkJoin是由任務類自己根據情況進行拆分處理。 

【 沒有返回值的任務:RecursiveAction 】  
很多的時候進行分支處理的時候有可能是沒有返回值的,所以可以修改一下任務的繼承父類”RecursiveAction”。

範例:定義分支任務

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        CountSave save = new CountSave() ;
        // 從0-100的累加處理操作
        AddTask task = new AddTask(save,0, 100);
        ForkJoinPool pool = new ForkJoinPool() ;
        pool.submit(task);// 提交任務
        // 當前的任務沒有結束
        while(!task.isDone()) {
            TimeUnit.MILLISECONDS.sleep(100);
        }
        // 分支任務計算完成
        if (task.isCompletedNormally()) {
            System.out.println("計算完成了:" + save.getSum());
        }
    }
}
class CountSave {  // 儲存資料處理結果
    private Lock lock = new ReentrantLock() ;
    private int sum = 0 ;  // 儲存處理結果
    public void add(int sum) {
        this.lock.lock();
        try {
            this.sum += sum ;
        } finally {  this.lock.unlock();   }
    }
    public int getSum() {
        return this.sum ;
    }
}

// 相當於繼承了ForkJoinTask父類
@SuppressWarnings("serial")
class AddTask extends RecursiveAction {
    private int start;
    private int end;
    private CountSave save ;
    // 傳入計算的開始和結束的值
    public AddTask(CountSave save,int start, int end) {
        this.save = save ;
        this.start = start;
        this.end = end;
    }
    @Override
    protected void compute() {
        // 開啟了分支
        if (end - start < 100) {
            int sum = 0; // 儲存求和的計算結果
            for (int x = start; x <= end; x++) {
                sum += x;
            }
            this.save.add(sum); // 儲存計算結果
        } else {
            // 中間值
            int middle = (start + end) / 2;
            // 做0 - 50累加
            AddTask leftTask = new AddTask(this.save,start, middle);
            // 做51 - 100累加
            AddTask rightTask = new AddTask(this.save,middle + 1, end);
            // 並行執行的任務
            super.invokeAll(leftTask, rightTask);
        }
    }
}
計算完成了:5050

實際上在使用ForkJoinTask處理的時候還可以取得使用的執行緒的訊息。

public class TestDemo {

    public static void main(String[] args) throws Exception {
        CountSave save = new CountSave() ;
        // 從0-100的累加處理操作
        AddTask task = new AddTask(save,0, 100);
        ForkJoinPool pool = new ForkJoinPool() ;
        // 提交任務
        pool.submit(task);
        while(!task.isDone()) {	// 當前的任務沒有結束
            System.out.println("活躍執行緒:" + pool.getActiveThreadCount()
                    + "、最大的併發執行緒數:" + pool.getParallelism());
            // TimeUnit.MILLISECONDS.sleep(100);
        }
        if (task.isCompletedNormally()) {	// 分支任務計算完成
            System.out.println("計算完成了:" + save.getSum());
        }
    }
}

活躍執行緒:1、最大的併發執行緒數:8
活躍執行緒:2、最大的併發執行緒數:8
活躍執行緒:2、最大的併發執行緒數:8
活躍執行緒:2、最大的併發執行緒數:8
活躍執行緒:2、最大的併發執行緒數:8
活躍執行緒:2、最大的併發執行緒數:8
活躍執行緒:1、最大的併發執行緒數:8
活躍執行緒:1、最大的併發執行緒數:8
活躍執行緒:1、最大的併發執行緒數:8
活躍執行緒:1、最大的併發執行緒數:8
計算完成了:5050

也就是說如果你現在要考慮所有底層的設計因素,那麼就必須針對當前的主機硬體環境做出判斷後才能夠寫出

良好的分支處理操作。