1. 程式人生 > >Java進階之FutureTask的用法及解析

Java進階之FutureTask的用法及解析

1 FutureTask概念

FutureTask一個可取消的非同步計算,FutureTask 實現了Future的基本方法,提空 start cancel 操作,可以查詢計算是否已經完成,並且可以獲取計算的結果。結果只可以在計算完成之後獲取,get方法會阻塞當計算沒有完成的時候,一旦計算已經完成,那麼計算就不能再次啟動或是取消。

一個FutureTask 可以用來包裝一個 Callable 或是一個runnable物件。因為FurtureTask實現了Runnable方法,所以一個 FutureTask可以提交(submit)給一個Excutor執行(excution).

2 FutureTask使用場景

FutureTask可用於非同步獲取執行結果或取消執行任務的場景。通過傳入Runnable或者Callable的任務給FutureTask,直接呼叫其run方法或者放入執行緒池執行,之後可以在外部通過FutureTask的get方法非同步獲取執行結果,因此,FutureTask非常適合用於耗時的計算,主執行緒可以在完成自己的任務後,再去獲取結果。另外,FutureTask還可以確保即使呼叫了多次run方法,它都只會執行一次Runnable或者Callable任務,或者通過cancel取消FutureTask的執行等。

2.1 FutureTask執行多工計算的使用場景

利用FutureTask和ExecutorService,可以用多執行緒的方式提交計算任務,主執行緒繼續執行其他任務,當主執行緒需要子執行緒的計算結果時,在非同步獲取子執行緒的執行結果。

public class FutureTest1 {

    public static void main(String[] args) {
        Task task = new Task();// 新建非同步任務
        FutureTask<Integer> future = new FutureTask<Integer>(task) {
            // 非同步任務執行完成,回撥
            @Override
            protected void done() {
                try
{ System.out.println("future.done():" + get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }; // 建立執行緒池(使用了預定義的配置) ExecutorService executor = Executors.newCachedThreadPool(); executor.execute(future); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } // 可以取消非同步任務 // future.cancel(true); try { // 阻塞,等待非同步任務執行完畢-獲取非同步任務的返回值 System.out.println("future.get():" + future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 非同步任務 static class Task implements Callable<Integer> { // 返回非同步任務的執行結果 @Override public Integer call() throws Exception { int i = 0; for (; i < 10; i++) { try { System.out.println(Thread.currentThread().getName() + "_" + i); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } return i; } } }

這裡寫圖片描述

2.2 FutureTask在高併發環境下確保任務只執行一次

在很多高併發的環境下,往往我們只需要某些任務只執行一次。這種使用情景FutureTask的特性恰能勝任。舉一個例子,假設有一個帶key的連線池,當key存在時,即直接返回key對應的物件;當key不存在時,則建立連線。對於這樣的應用場景,通常採用的方法為使用一個Map物件來儲存key和連線池對應的對應關係,典型的程式碼如下面所示:

private Map<String, Connection> connectionPool = new HashMap<String, Connection>();  
private ReentrantLock lock = new ReentrantLock();  

public Connection getConnection(String key){  
    try{  
        lock.lock();  
        if(connectionPool.containsKey(key)){  
            return connectionPool.get(key);  
        }  
        else{  
            //建立 Connection  
            Connection conn = createConnection();  
            connectionPool.put(key, conn);  
            return conn;  
        }  
    }  
    finally{  
        lock.unlock();  
    }  
}  

//建立Connection(根據業務需求,自定義Connection)  
private Connection createConnection(){  
    return null;  
}  

在上面的例子中,我們通過加鎖確保高併發環境下的執行緒安全,也確保了connection只建立一次,然而確犧牲了效能。改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作,效能大大提高,但是在高併發的情況下有可能出現Connection被建立多次的現象。這時最需要解決的問題就是當key不存在時,建立Connection的動作能放在connectionPool之後執行,這正是FutureTask發揮作用的時機,基於ConcurrentHashMap和FutureTask的改造程式碼如下:

private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();  

public Connection getConnection(String key) throws Exception{  
    FutureTask<Connection>connectionTask=connectionPool.get(key);  
    if(connectionTask!=null){  
        return connectionTask.get();  
    }  
    else{  
        Callable<Connection> callable = new Callable<Connection>(){  
            @Override  
            public Connection call() throws Exception {  
                // TODO Auto-generated method stub  
                return createConnection();  
            }  
        };  
        FutureTask<Connection>newTask = new FutureTask<Connection>(callable);  
        connectionTask = connectionPool.putIfAbsent(key, newTask);  
        if(connectionTask==null){  
            connectionTask = newTask;  
            connectionTask.run();  
        }  
        return connectionTask.get();  
    }  
}  

//建立Connection(根據業務需求,自定義Connection)  
private Connection createConnection(){  
    return null;  
}  

經過這樣的改造,可以避免由於併發帶來的多次建立連線及鎖的出現。

3 部分原始碼分析

3.1 構造方法

public FutureTask(Runnable runnable, V result) {  
        this.callable = Executors.callable(runnable, result);  
        this.state = NEW;       // ensure visibility of callable  
 } 

3.2 cancel

//這個方法有一個引數 是否中斷running  
     public boolean cancel(boolean mayInterruptIfRunning) {  
          /** 
          * 這個有點暈啊邏輯關係是 
          * 等價與 if(state!=new || !UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)) 
          * 這個意思是 如果state不是new 那麼就退出方法,這時的任務任務坑是已經完成了 或是被取消了 或是被中斷了 
          * 如果是state 是new 就設定state 為中斷狀態 或是取消狀態 
          * 
          **/  
        if (!(state == NEW &&  
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,  
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))  
            return false;  
        try {    // in case call to interrupt throws exception  
            //如果是可中斷 那麼就 呼叫系統中斷方法 然後把狀態設定成INTERRUPTED  
            if (mayInterruptIfRunning) {  
                try {  
                    Thread t = runner;  
                    if (t != null)  
                        t.interrupt();  
                } finally { // final state  
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);  
                }  
            }  
        } finally {  
            finishCompletion();  
        }  
        return true;  
    }  

4 參考連結