高併發第十二彈:併發容器J.U.C -- Executor元件FutureTask、ForkJoin
從本章開始就要說 Executor 的東西了.本次講的是一個很常用的FutureTask,和一個不是那麼常用的ForkJoin,我們現在就來介紹吧
引言
大部分時候建立執行緒的2種方式,一種是直接繼承Thread,另外一種就是實現Runnable介面。但是這兩種方式都有一個缺陷就是:在執行完任務之後無法獲取執行結果。
所以後期就提供了Callable和Future,通過它們可以在任務執行完畢之後得到任務執行結果。FutureTask又是集大成者.
一個一個來介紹
Callable:只有一個帶返回值的call方法
@FunctionalInterface public interfaceCallable<V> { V call() throws Exception; }
那麼怎麼使用Callable呢?一般情況下是配合ExecutorService來使用的,在ExecutorService介面中聲明瞭若干個submit方法的過載版本:
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task)
第一個submit方法裡面的引數型別就是Callable。
暫時只需要知道Callable一般是和ExecutorService配合來使用的,具體的使用方法講在後面講述。
一般情況下我們使用第一個submit方法和第三個submit方法,第二個submit方法很少使用。
Future介面
public interface Future<V> { // 取消任務 boolean cancel(boolean mayInterruptIfRunning); //取消狀態 boolean isCancelled();//是否完成 boolean isDone();//方法用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回 V get()throws InterruptedException, ExecutionException;// 用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
也就是說Future提供了三種功能:
1)判斷任務是否完成;
2)能夠中斷任務;
3)能夠獲取任務執行結果。
因為Future只是一個介面,所以是無法直接用來建立物件使用的,因此就有了下面的FutureTask。
FutureTask
FutureTask這個才是真實使用者.FutureTask實現了RunnableFuture介面,而RunnableFuture介面繼承了Runnable與Future介面,所以它既可以作為Runnable被執行緒中執行,又可以作為callable獲得返回值
public class FutureTask<V> implements RunnableFuture<V> { ... } public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
FutureTask的兩個構造類
public FutureTask(Callable<V> callable) { } public FutureTask(Runnable runnable, V result) { }
使用例項
方法一:使用Callable
ExecutorService executor = Executors.newCachedThreadPool(); Callable<String> task = new Callable<String>() { @Override public String call() throws Exception { return "task 返回結果"; } }; Future<String> result = executor.submit(task); executor.shutdown(); if(result.isDone()) { System.out.println(result.get()); }
方式二:使用FutureTask
FutureTask<String> task = new FutureTask<>(new Callable<String>() { @Override public String call() throws Exception { return "aaaa"; } }); task.run(); System.out.println(task.get());
fork/join:
Fork/Join框架:在必要的情況下,將一個大任務,進行拆分(fork) 成若干個子任務(拆到不能再拆,這裡就是指我們制定的拆分的臨界值),再將一個個小任務的結果進行join彙總。
Fork/Join與傳統執行緒池的區別!
Fork/Join採用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到執行緒佇列中,然後再從一個隨即執行緒中偷一個並把它加入自己的佇列中。
就比如兩個CPU上有不同的任務,這時候A已經執行完,B還有任務等待執行,這時候A就會將B隊尾的任務偷過來,加入自己的佇列中,對於傳統的執行緒,ForkJoin更有效的利用的CPU資源!
我們來看一下ForkJoin的實現:實現這個框架需要繼承RecursiveTask 或者 RecursiveAction ,RecursiveTask是有返回值的,相反Action則沒有
侷限性:
1、任務只能使用fork和join作為同步機制,如果使用了其他同步機制,當他們在同步操作時,工作執行緒就不能執行其他任務了。比如在fork框架使任務進入了睡眠,那麼在睡眠期間內在執行這個任務的執行緒將不會執行其他任務了。
2、我們所拆分的任務不應該去執行IO操作,如讀和寫資料檔案。
3、任務不能丟擲檢查異常。必須通過必要的程式碼來處理他們。
框架核心:
核心有兩個類:
ForkJoinPool | ForkJoinTask ForkJoinPool:負責來做實現,包括工作竊取演算法、管理工作執行緒和提供關於任務的狀態以及他們的執行資訊。
ForkJoinTask:提供在任務中執行fork和join的機制。
測試一下:
public class ForkJoinWork extends RecursiveTask<Long>{ private Long start;//起始值 private Long end;//結束值 public static final Long critical = 5000L;//臨界值 ,設定不大於這個值就不分裂 public ForkJoinWork(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { //判斷是否是拆分完畢 Long lenth = end - start; if(lenth<=critical){ //如果拆分完畢就相加 Long sum = 0L; for (Long i = start;i<=end;i++){ sum += i; } return sum; }else { //沒有拆分完畢就開始拆分 Long middle = (end + start)/2;//計算的兩個值的中間值 ForkJoinWork right = new ForkJoinWork(start,middle); right.fork();//拆分,並壓入執行緒佇列 ForkJoinWork left = new ForkJoinWork(middle+1,end); left.fork();//拆分,並壓入執行緒佇列 //合併 return right.join() + left.join(); } } public static void main(String[] args) { long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool();//實現ForkJoin 就必須有ForkJoinPool的支援 ForkJoinTask<Long> task = new ForkJoinWork(0L,10000000000L);//引數為起始值與結束值 Long invoke = forkJoinPool.invoke(task); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + invoke+" time: " + (l1-l)); } }
測試程式碼
public class ForkJoinWorkTest { public static void main(String[] args) { test(); // test2(); // test3(); } public static void test() { // ForkJoin實現 long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool();// 實現ForkJoin 就必須有ForkJoinPool的支援 ForkJoinTask<Long> task = new ForkJoinWork(0L, 10000000000L);// 引數為起始值與結束值 Long invoke = forkJoinPool.invoke(task); long l1 = System.currentTimeMillis(); System.out.println("ForkJoin實現 time: " + (l1 - l)); } public static void test2() { // 普通執行緒實現 Long x = 0L; Long y = 10000000000L; long l = System.currentTimeMillis(); for (Long i = 0L; i <= y; i++) { x += i; } long l1 = System.currentTimeMillis(); System.out.println("普通執行緒實現 time: " + (l1 - l)); } public static void test3() { // Java 8 並行流的實現 long l = System.currentTimeMillis(); long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum); long l1 = System.currentTimeMillis(); System.out.println("Java 8 並行流的實現 time: " + (l1 - l)); } }
結果:
ForkJoin實現 time: 38798
普通執行緒實現 time: 58860
Java 8 並行流的實現 time: 2375
我們觀察上面可以看出來執行10000000000L的相加操作各自執行完畢的時間不同。觀察到當資料很大的時候ForkJoin比普通執行緒實現有效的多,但是相比之下ForkJoin的實現實在是有點麻煩,這時候Java 8 就為我們提供了一個並行流來實現ForkJoin實現的功能。可以看到並行流比自己實現ForkJoin還要快
Java 8 中將並行流進行了優化,我們可以很容易的對資料進行並行流的操作,Stream API可以宣告性的通過parallel()與sequential()在並行流與穿行流中隨意切換!