1. 程式人生 > >高併發第十二彈:併發容器J.U.C -- Executor元件FutureTask、ForkJoin

高併發第十二彈:併發容器J.U.C -- Executor元件FutureTask、ForkJoin

從本章開始就要說 Executor 的東西了.本次講的是一個很常用的FutureTask,和一個不是那麼常用的ForkJoin,我們現在就來介紹吧

引言

  大部分時候建立執行緒的2種方式,一種是直接繼承Thread,另外一種就是實現Runnable介面。但是這兩種方式都有一個缺陷就是:在執行完任務之後無法獲取執行結果。

    所以後期就提供了Callable和Future,通過它們可以在任務執行完畢之後得到任務執行結果。FutureTask又是集大成者.

一個一個來介紹

Callable:只有一個帶返回值的call方法

@FunctionalInterface
public interface
Callable<V> { V call() throws Exception; }

那麼怎麼使用Callable呢?一般情況下是配合ExecutorService來使用的,在ExecutorService介面中聲明瞭若干個submit方法的過載版本:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task)

  第一個submit方法裡面的引數型別就是Callable。

  暫時只需要知道Callable一般是和ExecutorService配合來使用的,具體的使用方法講在後面講述。

  一般情況下我們使用第一個submit方法和第三個submit方法,第二個submit方法很少使用。

Future介面

public interface Future<V> {
 // 取消任務
  boolean cancel(boolean mayInterruptIfRunning);     //取消狀態   
  boolean isCancelled();//是否完成
  boolean isDone();//方法用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回
  V get() 
throws InterruptedException, ExecutionException;// 用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

也就是說Future提供了三種功能:

  1)判斷任務是否完成;

  2)能夠中斷任務;

  3)能夠獲取任務執行結果。

因為Future只是一個介面,所以是無法直接用來建立物件使用的,因此就有了下面的FutureTask。

FutureTask

FutureTask這個才是真實使用者.FutureTask實現了RunnableFuture介面,而RunnableFuture介面繼承了Runnable與Future介面,所以它既可以作為Runnable被執行緒中執行,又可以作為callable獲得返回值

public class FutureTask<V> implements RunnableFuture<V> {
    ...
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

FutureTask的兩個構造類

public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}

使用例項

 方法一:使用Callable

ExecutorService executor = Executors.newCachedThreadPool();
        Callable<String> task = new Callable<String>() {

            @Override
            public String call() throws Exception {
                return "task 返回結果";
            }
        };
        Future<String> result = executor.submit(task);
        executor.shutdown();
        if(result.isDone()) {
            System.out.println(result.get());
        }

 方式二:使用FutureTask

FutureTask<String> task = new FutureTask<>(new Callable<String>() {

            @Override
            public String call() throws Exception {
                return "aaaa";
            }
        });
        task.run();
        System.out.println(task.get());

fork/join:

Fork/Join框架:在必要的情況下,將一個大任務,進行拆分(fork) 成若干個子任務(拆到不能再拆,這裡就是指我們制定的拆分的臨界值),再將一個個小任務的結果進行join彙總。

Fork/Join與傳統執行緒池的區別!

Fork/Join採用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到執行緒佇列中,然後再從一個隨即執行緒中偷一個並把它加入自己的佇列中。

就比如兩個CPU上有不同的任務,這時候A已經執行完,B還有任務等待執行,這時候A就會將B隊尾的任務偷過來,加入自己的佇列中,對於傳統的執行緒,ForkJoin更有效的利用的CPU資源!

我們來看一下ForkJoin的實現:實現這個框架需要繼承RecursiveTask 或者 RecursiveAction ,RecursiveTask是有返回值的,相反Action則沒有

侷限性:

1、任務只能使用fork和join作為同步機制,如果使用了其他同步機制,當他們在同步操作時,工作執行緒就不能執行其他任務了。比如在fork框架使任務進入了睡眠,那麼在睡眠期間內在執行這個任務的執行緒將不會執行其他任務了。

2、我們所拆分的任務不應該去執行IO操作,如讀和寫資料檔案。

3、任務不能丟擲檢查異常。必須通過必要的程式碼來處理他們。

框架核心:

核心有兩個類:

ForkJoinPool | ForkJoinTask ForkJoinPool:負責來做實現,包括工作竊取演算法、管理工作執行緒和提供關於任務的狀態以及他們的執行資訊。

ForkJoinTask:提供在任務中執行fork和join的機制。

測試一下:

public class ForkJoinWork extends RecursiveTask<Long>{


    private Long start;//起始值
    private Long end;//結束值
    public static final  Long critical = 5000L;//臨界值 ,設定不大於這個值就不分裂

    public ForkJoinWork(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        //判斷是否是拆分完畢
        Long lenth = end - start;
        if(lenth<=critical){
            //如果拆分完畢就相加
            Long sum = 0L;
            for (Long i = start;i<=end;i++){
                sum += i;
            }
            return sum;
        }else {
            //沒有拆分完畢就開始拆分
            Long middle = (end + start)/2;//計算的兩個值的中間值
            ForkJoinWork right = new ForkJoinWork(start,middle);
            right.fork();//拆分,並壓入執行緒佇列
            ForkJoinWork left = new ForkJoinWork(middle+1,end);
            left.fork();//拆分,並壓入執行緒佇列

            //合併
            return right.join() + left.join();
        }
    }

        public static void main(String[] args) {
             long l = System.currentTimeMillis();
             ForkJoinPool forkJoinPool = new ForkJoinPool();//實現ForkJoin 就必須有ForkJoinPool的支援
             ForkJoinTask<Long> task = new ForkJoinWork(0L,10000000000L);//引數為起始值與結束值
             Long invoke = forkJoinPool.invoke(task);
             long l1 = System.currentTimeMillis();
             System.out.println("invoke = " + invoke+"  time: " + (l1-l));
        }

}

測試程式碼

public class ForkJoinWorkTest {
    
    public static void main(String[] args) {
        test();
//        test2();
//        test3();
    }
    
    
    public static void test() {
        // ForkJoin實現
        long l = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();// 實現ForkJoin 就必須有ForkJoinPool的支援
        ForkJoinTask<Long> task = new ForkJoinWork(0L, 10000000000L);// 引數為起始值與結束值
        Long invoke = forkJoinPool.invoke(task);
        long l1 = System.currentTimeMillis();
        System.out.println("ForkJoin實現 time: " + (l1 - l));
    }

    public static void test2() {
        // 普通執行緒實現
        Long x = 0L;
        Long y = 10000000000L;
        long l = System.currentTimeMillis();
        for (Long i = 0L; i <= y; i++) {
            x += i;
        }
        long l1 = System.currentTimeMillis();
        System.out.println("普通執行緒實現  time: " + (l1 - l));
    }

    public static void test3() {
        // Java 8 並行流的實現
        long l = System.currentTimeMillis();
        long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum);
        long l1 = System.currentTimeMillis();
        System.out.println("Java 8 並行流的實現 time: " + (l1 - l));
    }
}

結果:

  ForkJoin實現 time: 38798

普通執行緒實現 time: 58860

 Java 8 並行流的實現 time: 2375

我們觀察上面可以看出來執行10000000000L的相加操作各自執行完畢的時間不同。觀察到當資料很大的時候ForkJoin比普通執行緒實現有效的多,但是相比之下ForkJoin的實現實在是有點麻煩,這時候Java 8 就為我們提供了一個並行流來實現ForkJoin實現的功能。可以看到並行流比自己實現ForkJoin還要快

Java 8 中將並行流進行了優化,我們可以很容易的對資料進行並行流的操作,Stream API可以宣告性的通過parallel()與sequential()在並行流與穿行流中隨意切換!