Java進階之FutureTask的用法及解析
1 FutureTask概念
FutureTask一個可取消的非同步計算,FutureTask 實現了Future的基本方法,提空 start cancel 操作,可以查詢計算是否已經完成,並且可以獲取計算的結果。結果只可以在計算完成之後獲取,get方法會阻塞當計算沒有完成的時候,一旦計算已經完成,那麼計算就不能再次啟動或是取消。
一個FutureTask 可以用來包裝一個 Callable 或是一個runnable物件。因為FurtureTask實現了Runnable方法,所以一個 FutureTask可以提交(submit)給一個Excutor執行(excution).
2 FutureTask使用場景
FutureTask可用於非同步獲取執行結果或取消執行任務的場景。通過傳入Runnable或者Callable的任務給FutureTask,直接呼叫其run方法或者放入執行緒池執行,之後可以在外部通過FutureTask的get方法非同步獲取執行結果,因此,FutureTask非常適合用於耗時的計算,主執行緒可以在完成自己的任務後,再去獲取結果。另外,FutureTask還可以確保即使呼叫了多次run方法,它都只會執行一次Runnable或者Callable任務,或者通過cancel取消FutureTask的執行等。
2.1 FutureTask執行多工計算的使用場景
利用FutureTask和ExecutorService,可以用多執行緒的方式提交計算任務,主執行緒繼續執行其他任務,當主執行緒需要子執行緒的計算結果時,在非同步獲取子執行緒的執行結果。
public class FutureTest1 {
public static void main(String[] args) {
Task task = new Task();// 新建非同步任務
FutureTask<Integer> future = new FutureTask<Integer>(task) {
// 非同步任務執行完成,回撥
@Override
protected void done() {
try {
System.out.println("future.done():" + get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
};
// 建立執行緒池(使用了預定義的配置)
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(future);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
// 可以取消非同步任務
// future.cancel(true);
try {
// 阻塞,等待非同步任務執行完畢-獲取非同步任務的返回值
System.out.println("future.get():" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 非同步任務
static class Task implements Callable<Integer> {
// 返回非同步任務的執行結果
@Override
public Integer call() throws Exception {
int i = 0;
for (; i < 10; i++) {
try {
System.out.println(Thread.currentThread().getName() + "_"
+ i);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return i;
}
}
}
2.2 FutureTask在高併發環境下確保任務只執行一次
在很多高併發的環境下,往往我們只需要某些任務只執行一次。這種使用情景FutureTask的特性恰能勝任。舉一個例子,假設有一個帶key的連線池,當key存在時,即直接返回key對應的物件;當key不存在時,則建立連線。對於這樣的應用場景,通常採用的方法為使用一個Map物件來儲存key和連線池對應的對應關係,典型的程式碼如下面所示:
private Map<String, Connection> connectionPool = new HashMap<String, Connection>();
private ReentrantLock lock = new ReentrantLock();
public Connection getConnection(String key){
try{
lock.lock();
if(connectionPool.containsKey(key)){
return connectionPool.get(key);
}
else{
//建立 Connection
Connection conn = createConnection();
connectionPool.put(key, conn);
return conn;
}
}
finally{
lock.unlock();
}
}
//建立Connection(根據業務需求,自定義Connection)
private Connection createConnection(){
return null;
}
在上面的例子中,我們通過加鎖確保高併發環境下的執行緒安全,也確保了connection只建立一次,然而確犧牲了效能。改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作,效能大大提高,但是在高併發的情況下有可能出現Connection被建立多次的現象。這時最需要解決的問題就是當key不存在時,建立Connection的動作能放在connectionPool之後執行,這正是FutureTask發揮作用的時機,基於ConcurrentHashMap和FutureTask的改造程式碼如下:
private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();
public Connection getConnection(String key) throws Exception{
FutureTask<Connection>connectionTask=connectionPool.get(key);
if(connectionTask!=null){
return connectionTask.get();
}
else{
Callable<Connection> callable = new Callable<Connection>(){
@Override
public Connection call() throws Exception {
// TODO Auto-generated method stub
return createConnection();
}
};
FutureTask<Connection>newTask = new FutureTask<Connection>(callable);
connectionTask = connectionPool.putIfAbsent(key, newTask);
if(connectionTask==null){
connectionTask = newTask;
connectionTask.run();
}
return connectionTask.get();
}
}
//建立Connection(根據業務需求,自定義Connection)
private Connection createConnection(){
return null;
}
經過這樣的改造,可以避免由於併發帶來的多次建立連線及鎖的出現。
3 部分原始碼分析
3.1 構造方法
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
3.2 cancel
//這個方法有一個引數 是否中斷running
public boolean cancel(boolean mayInterruptIfRunning) {
/**
* 這個有點暈啊邏輯關係是
* 等價與 if(state!=new || !UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))
* 這個意思是 如果state不是new 那麼就退出方法,這時的任務任務坑是已經完成了 或是被取消了 或是被中斷了
* 如果是state 是new 就設定state 為中斷狀態 或是取消狀態
*
**/
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
//如果是可中斷 那麼就 呼叫系統中斷方法 然後把狀態設定成INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}