Java併發程式設計與技術內幕:Callable、Future、FutureTask、CompletionService
在上一文章中,筆者介紹了執行緒池及其內部的原理。今天主要講的也是和執行緒相關的內容。一般情況下,使用Runnable介面、Thread實現的執行緒我們都是無法返回結果的。但是如果對一些場合需要執行緒返回的結果。就要使用用Callable、Future、FutureTask、CompletionService這幾個類。Callable只能在ExecutorService的執行緒池中跑,但有返回結果,也可以通過返回的Future物件查詢執行狀態。Future 本身也是一種設計模式,它是用來取得非同步任務的結果,
一、基本原始碼
所以來看看它們的原始碼資訊
1、Callable
看看其原始碼:
public interface Callable<V> {
V call() throws Exception;
}
它只有一個call方法,並且有一個返回V,是泛型。可以認為這裡返回V就是執行緒返回的結果。
ExecutorService介面:執行緒池執行排程框架
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
2、Future
Future是我們最常見的
public interface Future<V> { //試圖取消對此任務的執行。如果任務已完成、或已取消,或者由於某些其他原因而無法取消,則此嘗試將失敗。當呼叫 cancel 時,如果呼叫成功,而此任務尚未啟動 //,則此任務將永不執行。如果任務已經啟動,則 //mayInterruptIfRunning 引數確定是否應該以試圖停止任務的方式來中斷執行此任務的執行緒。此方法返回後,對 isDone() 的後續呼叫將始終返回 true。如果此方法返 //回 true,則對 isCancelled() //的後續呼叫將始終返回 true。 boolean cancel(boolean mayInterruptIfRunning); //如果在任務正常完成前將其取消,則返回 true。 boolean isCancelled(); //如果任務已完成,則返回 true。 可能由於正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回 true。 boolean isDone(); //等待執行緒結果返回,會阻塞 V get() throws InterruptedException, ExecutionException; //設定超時時間 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
3、FutureTask
從原始碼看其繼承關係如下:
其原始碼如下:
public class FutureTask<V> implements RunnableFuture<V> {
//真正用來執行執行緒的類
private final Sync sync;
//構造方法1,從Callable來建立FutureTask
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
//構造方法2,從Runnable來建立FutureTask,V就是執行緒執行返回結果
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
//和Futrue一樣
public boolean isCancelled() {
return sync.innerIsCancelled();
}
//和Futrue一樣
public boolean isDone() {
return sync.innerIsDone();
}
//和Futrue一樣
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}
//和Futrue一樣
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
//和Futrue一樣
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}
//執行緒結束後的操作
protected void done() { }
//設定結果
protected void set(V v) {
sync.innerSet(v);
}
//設定異常
protected void setException(Throwable t) {
sync.innerSetException(t);
}
//執行緒執行入口
public void run() {
sync.innerRun();
}
//重置
protected boolean runAndReset() {
return sync.innerRunAndReset();
}
//這個類才是真正執行、關閉執行緒的類
private final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7828117401763700385L;
//執行緒執行狀態
private static final int RUNNING = 1;
private static final int RAN = 2;
private static final int CANCELLED = 4;
private final Callable<V> callable;
private V result;
private Throwable exception;
//執行緒例項
private volatile Thread runner;
//建構函式
Sync(Callable<V> callable) {
this.callable = callable;
}
。。。。
}
}
FutureTask類是Future 的一個實現,並實現了Runnable,所以可通過Excutor(執行緒池) 來執行,也可傳遞給Thread物件執行。如果在主執行緒中需要執行比較耗時的操作時,但又不想阻塞主執行緒時,可以把這些作業交給Future物件在後臺完成,當主執行緒將來需要時,就可以通過Future物件獲得後臺作業的計算結果或者執行狀態。 Executor框架利用FutureTask來完成非同步任務,並可以用來進行任何潛在的耗時的計算。一般FutureTask多用於耗時的計算,主執行緒可以在完成自己的任務後,再去獲取結果。FutureTask類既可以使用new Thread(Runnable r)放到一個新執行緒中跑,也可以使用ExecutorService.submit(Runnable r)放到執行緒池中跑,而且兩種方式都可以獲取返回結果,但實質是一樣的,即如果要有返回結果那麼建構函式一定要注入一個Callable物件。二、應用例項
1、Future例項
package com.func.axc.futuretask;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 功能概要:
*
* @author linbingwen
* @since 2016年6月8日
*/
public class FutureTest {
/**
* @author linbingwen
* @since 2016年6月8日
* @param args
*/
public static void main(String[] args) {
System.out.println("main Thread begin at:"+ System.nanoTime());
ExecutorService executor = Executors.newCachedThreadPool();
HandleCallable task1 = new HandleCallable("1");
HandleCallable task2 = new HandleCallable("2");
HandleCallable task3 = new HandleCallable("3");
Future<Integer> result1 = executor.submit(task1);
Future<Integer> result2 = executor.submit(task2);
Future<Integer> result3 = executor.submit(task3);
executor.shutdown();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
System.out.println("task1執行結果:"+result1.get());
System.out.println("task2執行結果:"+result2.get());
System.out.println("task3執行結果:"+result3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("main Thread finish at:"+ System.nanoTime());
}
}
class HandleCallable implements Callable<Integer>{
private String name;
public HandleCallable(String name) {
this.name = name;
}
@Override
public Integer call() throws Exception {
System.out.println("task"+ name + "開始進行計算");
Thread.sleep(3000);
int sum = new Random().nextInt(300);
int result = 0;
for (int i = 0; i < sum; i++)
result += i;
return result;
}
}
執行結果:2、FutureTask
方法一、直接通過New Thread來啟動執行緒
package com.func.axc.futuretask;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import org.springframework.scheduling.config.Task;
/**
* 功能概要:
*
* @author linbingwen
* @since 2016年5月31日
*/
public class FutrueTaskTest {
public static void main(String[] args) {
//採用直接啟動執行緒的方法
System.out.println("main Thread begin at:"+ System.nanoTime());
MyTask task1 = new MyTask("1");
FutureTask<Integer> result1 = new FutureTask<Integer>(task1);
Thread thread1 = new Thread(result1);
thread1.start();
MyTask task2 = new MyTask("2");
FutureTask<Integer> result2 = new FutureTask<Integer>(task2);
Thread thread2 = new Thread(result2);
thread2.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
System.out.println("task1返回結果:" + result1.get());
System.out.println("task2返回結果:" + result2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("main Thread finish at:"+ System.nanoTime());
}
}
class MyTask implements Callable<Integer> {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public Integer call() throws Exception {
System.out.println("task"+ name + "開始進行計算");
Thread.sleep(3000);
int sum = new Random().nextInt(300);
int result = 0;
for (int i = 0; i < sum; i++)
result += i;
return result;
}
}
執行結果:
方法二、通過執行緒池來啟動執行緒
package com.func.axc.futuretask;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 功能概要:
*
* @author linbingwen
* @since 2016年5月31日
*/
public class FutrueTaskTest2 {
public static void main(String[] args) {
System.out.println("main Thread begin at:"+ System.nanoTime());
ExecutorService executor = Executors.newCachedThreadPool();
MyTask2 task1 = new MyTask2("1");
MyTask2 task2 = new MyTask2("2");
Future<Integer> result1 = executor.submit(task1);
Future<Integer> result2 = executor.submit(task2);
executor.shutdown();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
System.out.println("task1返回結果:" + result1.get());
System.out.println("task2返回結果:" + result2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("main Thread finish at:"+ System.nanoTime());
}
}
class MyTask2 implements Callable<Integer> {
private String name;
public MyTask2(String name) {
this.name = name;
}
@Override
public Integer call() throws Exception {
System.out.println("task"+ name + "開始進行計算");
Thread.sleep(3000);
int sum = new Random().nextInt(300);
int result = 0;
for (int i = 0; i < sum; i++)
result += i;
return result;
}
}
執行結果:三、CompletionService
這個光看其單詞,就可以猜到它應該是一個執行緒執行完成後相關的服務,沒錯。它就是一個將執行緒池執行結果放入到一個Blockqueueing的類。那麼它和Future或FutureTask有什麼不同呢?其實在上面的例子中,筆者用的例項可能不太好。如果線上程池中我們使用Future或FutureTask來取得返回結果,比如。我們開了100條執行緒。但是這些執行緒的執行時間是未知的。但是我們又需要返回結果。每執行一條執行緒就根據結果做一次相應的操作。如果是Future或FutureTask。我們只能通過一個迴圈,不斷的遍歷執行緒池裡的執行緒。取得其執行狀態。然後再來取結果。這樣效率就太低了,有可能發生一條執行緒執行完畢了,但我們不能立刻知道它處理完成了。還得通過一個迴圈來判斷。基本上面的這種問題,所以出了CompletionService。
CompletionService原理不是很難,它就是將一組執行緒的執行結果放入一個BlockQueueing當中。這裡執行緒的執行結果放入到Blockqueue的順序只和這個執行緒的執行時間有關。和它們的啟動順序無關。並且你無需自己在去寫很多判斷哪個執行緒是否執行完成,它裡面會去幫你處理。
首先看看其原始碼:
package java.util.concurrent;
public interface CompletionService<V> {
//提交執行緒任務
Future<V> submit(Callable<V> task);
//提交執行緒任務
Future<V> submit(Runnable task, V result);
//阻塞等待
Future<V> take() throws InterruptedException;
//非阻塞等待
Future<V> poll();
//帶時間的非阻塞等待
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
上面只是一個介面類,其實現類如下:package java.util.concurrent;
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;//執行緒池類
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;//存放執行緒執行結果的阻塞佇列
//內部封裝的一個用來執執行緒的FutureTask
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }//執行緒執行完成後呼叫此函式將結果放入阻塞佇列
private final Future<V> task;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
//建構函式,這裡一般傳入一個執行緒池物件executor的實現類
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();//預設的是連結串列阻塞佇列
}
//建構函式,可以自己設定阻塞佇列
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
//提交執行緒任務,其實最終還是executor去提交
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
//提交執行緒任務,其實最終還是executor去提交
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}
從原始碼中可以知道。最終還是執行緒還是提交到Executor當中去執行,所以建構函式中需要Executor引數來例項化。而每次有執行緒執行完成後往阻塞佇列新增一個Future。這是上面的RunnableFuture,這是每次往執行緒池是放入的執行緒。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
接下來以兩個例子來說明其使用
1、與Future的區別使用:
自定義一個Callable
class HandleFuture<Integer> implements Callable<Integer> {
private Integer num;
public HandleFuture(Integer num) {
this.num = num;
}
@Override
public Integer call() throws Exception {
Thread.sleep(3*100);
System.out.println(Thread.currentThread().getName());
return num;
}
}
首先是Futuer public static void FutureTest() throws InterruptedException, ExecutionException {
System.out.println("main Thread begin:");
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<Integer>> result = new ArrayList<Future<Integer>>();
for (int i = 0;i<10;i++) {
Future<Integer> submit = executor.submit(new HandleFuture(i));
result.add(submit);
}
executor.shutdown();
for (int i = 0;i<10;i++) {//一個一個等待返回結果
System.out.println("返回結果:"+result.get(i).get());
}
System.out.println("main Thread end:");
}
執行結果:從輸出結果可以看出,我們只能一個一個阻塞的取出。這中間肯定會浪費一定的時間在等待上。如7返回了。但是前面1-6都沒有返回。那麼7就得等1-6輸出才能輸出。
接下來換成CompletionService來做:
public static void CompleTest() throws InterruptedException, ExecutionException {
System.out.println("main Thread begin:");
ExecutorService executor = Executors.newCachedThreadPool();
// 構建完成服務
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
for (int i = 0;i<10;i++) {
completionService.submit(new HandleFuture(i));
}
for (int i = 0;i<10;i++) {//一個一個等待返回結果
System.out.println("返回結果:"+completionService.take().get());
}
System.out.println("main Thread end:");
}
輸出結果:可以看出,結果的輸出和執行緒的放入順序無關係。每一個執行緒執行成功後,立刻就輸出。