1. 程式人生 > >Guava ListenableFuture實現非同步非阻塞呼叫

Guava ListenableFuture實現非同步非阻塞呼叫

為了保證系統響應迅速,需要尋找一種方法能夠使調取介面能夠非同步執行,而Java正好提供了類似的方法,在java.util.concurrent中包含了Future相關的類,運用其中的一些類可以進行非同步計算,以減少主執行緒的等待時間。比如啟動一個main方法,main中又包含了若干個其它任務,在不使用Java future的情況下,main方法中的任務會同步阻塞執行,一個執行完成後,才能去執行另一個;如果使用java future,則main方法中的任務會非同步執行,main方法不用等待一個任務的執行完成,只需往下執行就行。一個任務的執行結果又該怎麼獲取呢?這裡就需要用到Future介面中的isDone()方法來判斷任務是否執行完,如果執行完成則可獲取結果,如果沒有完成則需要等待。 可見雖然主執行緒中的多個任務是非同步執行,但是無法確定任務什麼時候執行完成,只能通過不斷去監聽以獲取結果,所以這裡是阻塞的。這樣,可能某一個任務執行時間很長會拖累整個主任務的執行。

前言

針對這樣的情況,google對java.util.concurrent中的許多類進行封裝,最終產生了google guava框架,其中com.google.common.util中的ListenableFuture就是本文要敘述的重點。檢視com.google.common.util,發現其中的很多類都是對java.util.concurrent的封裝,以增加特有的方法。ListenableFuture擴充套件了future方法,增加了addListener方法,該方法可以監聽執行緒,並通過回撥函式來獲取結果,達到執行緒之間非同步非阻塞執行。

首先,瞭解下同步、非同步、阻塞、非阻塞相關概念;其次,簡單介紹java future和guava future相關技術,並通過示例程式碼進一步對其進行理解;最後,對java future和guava future進行比較。

同步、非同步、阻塞、非阻塞

同步

同步:所謂同步,就是在發出一個功能呼叫時,在沒有得到結果之前,該呼叫就不返回。也就是必須一件一件事做,等前一件做完了才能做下一件事。

非同步

非同步:非同步的概念和同步相對。當一個非同步過程呼叫發出後,呼叫者不能立刻得到結果。實際處理這個呼叫的部件在完成後,通過狀態、通知和回撥來通知呼叫者。

阻塞

阻塞:阻塞呼叫是指呼叫結果返回之前,當前執行緒會被掛起(執行緒進入非可執行狀態,在這個狀態下,cpu不會給執行緒分配時間片,即執行緒暫停執行)。函式只有在得到結果之後才會返回。

非阻塞

非阻塞:非阻塞和阻塞的概念相對應,指在不能立刻得到結果之前,該函式不會阻塞當前執行緒,而會立刻返回。

Java Future

Java Future 的特點,可以理解為(減少主函式的等待時間,使得原本需要等待的時間段可以處理其它事情)

Executors建立執行緒池的幾種常見方式

通過Executors可以建立不同類似的執行緒池,常見的大概有下表幾種型別,還有些可能未被列出。在實際應用中,個人感覺主要使用newCachedThreadPool和newFixedThreadPool來建立執行緒池。

類名 說明
newCachedThreadPool 快取型池子,先檢視池中有沒有以前建立的執行緒,如果有,就reuse;如果沒有,就建一個新的執行緒加入池中。快取型池子通常用於執行一些生存期很短的非同步型任務。因此在一些面向連線的daemon型SERVER中用得不多。能reuse的執行緒,必須是timeout IDLE內的池中執行緒,預設timeout為60s,超過這個IDLE時長,執行緒例項將被終止並移出池子。注意:放入CachedThreadPool的執行緒超過TIMEOUT不活動,其會自動被終止。
newFixedThreadPool 和cacheThreadPool類似,有可用的執行緒就使用,但不能隨時建新的執行緒。其獨特之處:任意時間點,最多隻能有固定數目的活動執行緒存在,此時如果有新的執行緒要建立,只能放在另外的佇列中等待,直到當前的執行緒中某個執行緒終止直接被移出池子。cache池和fixed池呼叫的是同一個底層池,只不過引數不同:fixed池執行緒數固定,並且是0秒IDLE(無IDLE)。所以FixedThreadPool多數針對一些很穩定很固定的正規併發執行緒,多用於伺服器。cache池執行緒數支援0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE。
ScheduledThreadPool 排程型執行緒池。這個池子裡的執行緒可以按schedule依次delay執行,或週期執行。
SingleThreadExecutor 單例執行緒,任意時間池中只能有一個執行緒。用的是和cache池和fixed池相同的底層池,但執行緒數目是1-1,0秒IDLE(無IDLE)。

Executors建立執行緒池原始碼

//呼叫newCachedThreadPool方法,可以建立一個緩衝型執行緒池,而在改方法中通過傳參建立一個ThreadPoolExecutor,我麼你會很奇怪明明返回的是一個ExecutorService,怎麼會建立了一個ThreadPoolExecutor呢?
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, 
                   TimeUnit.SECONDS, new SynchronousQueue<Runnable());
}

// ThreadPoolExecutor繼承了抽象的service類AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {}

//AbstractExecutorService實現了ExecutorService介面
public abstract class AbstractExecutorService implements ExecutorService {}

//所以ExecutorService其實是ThreadPoolExecutor的基類,這也就解釋清楚了

ExecutorService(執行緒池)

ExecutorService是一個介面,它繼承了Executor,在原有execute方法的基礎上新增了submit方法,傳入一個任務,該方法能夠返回一個Future物件,可以獲取非同步計算結果。

//ExecutorService繼承了Executor,並擴充套件了新方法。
public interface ExecutorService extends Executor { }

//Executor中的方法
void execute(Runnable command);

//增加了submit方法,該方法傳任務來獲取Future物件,而Future物件中可以獲取任務的執行結果
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);

Future(獲取非同步計算結果)

Future介面中有下表所示方法,可以獲取當前正在執行的任務相關資訊。

方法 說明
boolean cancel(boolean interruptIf) 取消任務的執行
boolean isCancelled() 任務是否已取消,任務正常完成前將其取消,返回 true
boolean isDone() 任務是否已完成,任務正常終止、異常或取消,返回true
V get() 等待任務結束,然後獲取V型別的結果
V get(long timeout, TimeUnit unit) 獲取結果,設定超時時間

FutureTask

Executor框架利用FutureTask來完成非同步任務,並可以用來進行任何潛在的耗時的計算。一般FutureTask多用於耗時的計算,主執行緒可以在完成自己的任務後,再去獲取結果。

FutureTask包裝了Callable和Runnable介面物件,提供對Future介面的基本實現,開始、取消計算、查詢計算是否完成、獲取計算結果。僅當計算完成時才能檢索結果,當計算沒有完成時,該方法會一直阻塞直到任務轉入完成狀態。一旦完成計算,不能夠重新開始或取消計算。通過Excutor(執行緒池)來執行,也可傳遞給Thread物件執行。如果在主執行緒中需要執行比較耗時的操作時,但又不想阻塞主執行緒時,可以把這些作業交給Future物件在後臺完成,當主執行緒將來需要時,就可以通過Future物件獲得後臺作業的計算結果或者執行狀態。

//通過傳入任務來構造FutureTask
public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}

//FutureTask中同樣有獲取當前任務狀態的方法
public boolean isCancelled(){}
public boolean isDone() {}
public boolean cancel(boolean mayInterruptIfRunning) {}

//FutureTask實現RunnableFuture
public class FutureTask<V> implements RunnableFuture<V> {}

//RunnableFuture繼承Runnable和Future
public interface RunnableFuture<V> extends Runnable, Future<V> 

示例程式碼

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestFuture {
    // 建立執行緒池
    final static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Long t1 = System.currentTimeMillis();

        // 任務1
        Future<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return true;
            }
        });

        while (true) {
            if (booleanTask.isDone() && !booleanTask.isCancelled()) {
                //模擬耗時
                Thread.sleep(500);
                Boolean result = booleanTask.get();
                System.err.println("BooleanTask: " + result);
                break;
            }
        }

        // 任務2
        Future<String> stringTask = service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello World";
            }
        });

        while (true) {
            if (stringTask.isDone() && !stringTask.isCancelled()) {
                String result = stringTask.get();
                System.err.println("StringTask: " + result);
                break;
            }
        }



        // 任務3
        Future<Integer> integerTask = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });

        while (true) {
            if (integerTask.isDone() && !integerTask.isCancelled()) {
                Integer result = integerTask.get();
                System.err.println("IntegerTask: " + result);
                break;
            }
        }

        // 執行時間
        System.err.println("time: " + (System.currentTimeMillis() - t1));
    }

}

Guava Future

Guava Future的特點,(減少主函式的等待時間,使得多工能夠非同步非阻塞執行)

ListenableFuture是可以監聽的Future,它是對java原生Future的擴充套件增強。Future表示一個非同步計算任務,當任務完成時可以得到計算結果。如果希望計算完成時馬上就拿到結果展示給使用者或者做另外的計算,就必須使用另一個執行緒不斷的查詢計算狀態。這樣做會使得程式碼複雜,且效率低下。如果使用ListenableFuture,Guava會幫助檢測Future是否完成了,如果完成就自動呼叫回撥函式,這樣可以減少併發程式的複雜度。

MoreExecutors

該類是final型別的工具類,提供了很多靜態方法。例如listeningDecorator方法初始化ListeningExecutorService方法,使用此例項submit方法即可初始化ListenableFuture物件。

ListeningExecutorService

該類是對ExecutorService的擴充套件,重寫ExecutorService類中的submit方法,返回ListenableFuture物件。

ListenableFuture

該介面擴充套件了Future介面,增加了addListener方法,該方法在給定的excutor上註冊一個監聽器,當計算完成時會馬上呼叫該監聽器。不能夠確保監聽器執行的順序,但可以在計算完成時確保馬上被呼叫。

FutureCallback

該介面提供了OnSuccess和OnFailuren方法。獲取非同步計算的結果並回調。

Futures

該類提供和很多實用的靜態方法以供使用。

ListenableFutureTask

該類擴充套件了FutureTask類並實現ListenableFuture介面,增加了addListener方法。

例項程式碼

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

public class TestListenableFuture2 {
    // 建立執行緒池
    final static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

    public static void main(String[] args) throws Exception {
        Long t1 = System.currentTimeMillis();
        // 任務1
        ListenableFuture<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return true;
            }
        });

        Futures.addCallback(booleanTask, new FutureCallback<Boolean>() {
            @Override
            public void onSuccess(Boolean result) {
                System.err.println("BooleanTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 任務2
        ListenableFuture<String> stringTask = service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello World";
            }
        });

        Futures.addCallback(stringTask, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                System.err.println("StringTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 任務3
        ListenableFuture<Integer> integerTask = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });

        Futures.addCallback(integerTask, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(Integer result) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.err.println("IntegerTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 執行時間
        System.err.println("time: " + (System.currentTimeMillis() - t1));
    }

}

Java Future 和 Guava Future的對比

Future 具有侷限性。在實際應用中,當需要下載大量圖片或視訊時,可以使用多執行緒去下載,提交任務下載後,可以從多個Future中獲取下載結果,由於Future獲取任務結果是阻塞的,所以將會依次呼叫Future.get()方法,這樣的效率會很低。很可能第一個下載速度很慢,則會拖累整個下載速度。

Future主要功能在於獲取任務執行結果和對非同步任務的控制。但如果要獲取批量任務的執行結果,從上面的例子我們已經可以看到,單使用 Future 是很不方便的。其主要原因在於:一方面是沒有好的方法去判斷第一個完成的任務;另一方面是 Future的get方法 是阻塞的,使用不當會造成執行緒的浪費。第一個問題可以用 CompletionService 解決,CompletionService 提供了一個 take() 阻塞方法,用以依次獲取所有已完成的任務。第二個問題可以用 Google Guava 庫所提供的 ListeningExecutorService 和 ListenableFuture 來解決。除了獲取批量任務執行結果時不便,Future另外一個不能做的事便是防止任務的重複提交。要做到這件事就需要 Future 最常見的一個實現類 FutureTask 了。Future只實現了非同步,而沒有實現回撥,主執行緒get時會阻塞,可以輪詢以便獲取非同步呼叫是否完成.

在實際的使用中建議使用Guava ListenableFuture來實現非同步非阻塞,目的就是多工非同步執行,通過回撥的方方式來獲取執行結果而不需輪詢任務狀態

參考部落格地址

http://blog.csdn.net/pistolove/article/details/51232004#t2