1. 程式人生 > >使用Guava+Spring實現非同步回撥操作,提高程式效能

使用Guava+Spring實現非同步回撥操作,提高程式效能

            

隨著移動網際網路的蓬勃發展,手機App層出不窮,其業務也隨之變得錯綜複雜。針對於開發人員來說,可能之前的一個業務只需要調取一次第三方介面以獲取資料,而如今隨著需求的增加,該業務需調取多個不同的第三方介面。通常,我們處理方法是讓程式碼同步順序的去調取這些介面。顯然,調取介面數量的增加必然會造成響應時間的增加,勢必會對系統性能造成一定影響。

      為了保證系統響應迅速,需要尋找一種方法能夠使調取介面能夠非同步執行,而java正好提供了類似的方法,在java.util.concurrent中包含了Future相關的類,運用其中的一些類可以進行非同步計算,以減少主執行緒的等待時間。比如啟動一個main方法,main中又包含了若干個其它任務,在不使用java future

的情況下,main方法中的任務會同步阻塞執行,一個執行完成後,才能去執行另一個;如果使用java future,則main方法中的任務會非同步執行,main方法不用等待一個任務的執行完成,只需往下執行就行。一個任務的執行結果又該怎麼獲取呢?這裡就需要用到Future介面中的isDone()方法來判斷任務是否執行完,如果完成完成則可獲取結果,如果沒有完成則需要等待,可見雖然主執行緒中的多個任務是非同步執行,但是無法確定任務什麼時候執行完成,只能通過不斷去監聽以獲取結果,所以這裡是阻塞的。這樣,可能某一個任務執行時間很長會拖累整個主任務的執行。

     針對這樣的情況,google對java.util.concurrent

中的許多類進行封裝,最終產生了google guava框架,其中com.google.common.util中的ListenableFuture就是本文要敘述的重點。檢視com.google.common.util,發現其中的很多類都是對java.util.concurrent的封裝,以增加特有的方法。ListenableFuture擴充套件了future方法,增加了addListener方法,該方法可以監聽執行緒,並通過回撥函式來獲取結果,達到執行緒之間非同步非阻塞執行。

     首先,瞭解下同步、非同步、阻塞、非阻塞相關概念;其次,簡單介紹java future和guava future相關技術,並通過示例程式碼進一步對其進行理解;最後,對java future和guava future進行比較。

        同步、非同步、阻塞、非阻塞

     同步:所謂同步,就是在發出一個功能呼叫時,在沒有得到結果之前,該呼叫就不返回。也就是必須一件一件事做,等前一件做完了才能做下一件事。

     非同步:非同步的概念和同步相對。當一個非同步過程呼叫發出後,呼叫者不能立刻得到結果。實際處理這個呼叫的部件在完成後,通過狀態、通知和回撥來通知呼叫者。

     阻塞:阻塞呼叫是指呼叫結果返回之前,當前執行緒會被掛起(執行緒進入非可執行狀態,在這個狀態下,cpu不會給執行緒分配時間片,即執行緒暫停執行)。函式只有在得到結果之後才會返回。

     非阻塞:非阻塞和阻塞的概念相對應,指在不能立刻得到結果之前,該函式不會阻塞當前執行緒,而會立刻返回。

   Java  Future

減少主函式的等待時間,使得原本需要等待的時間段可以處理其它事情

1、Executors建立執行緒池的幾種常見方式

     通過Executors可以建立不同類似的執行緒池,常見的大概有下表幾種型別,還有些可能未被列出。在實際應用中,個人感覺主要使用newCachedThreadPool和newFixedThreadPool來建立執行緒池。

類名 說明
newCachedThreadPool 快取型池子,先檢視池中有沒有以前建立的執行緒,如果有,就reuse;如果沒有,就建一個新的執行緒加入池中。快取型池子通常用於執行一些生存期很短的非同步型任務。因此在一些面向連線的daemon型SERVER中用得不多。能reuse的執行緒,必須是timeout IDLE內的池中執行緒,預設timeout為60s,超過這個IDLE時長,執行緒例項將被終止並移出池子。注意:放入CachedThreadPool的執行緒超過TIMEOUT不活動,其會自動被終止。
newFixedThreadPool 和cacheThreadPool類似,有可用的執行緒就使用,但不能隨時建新的執行緒。其獨特之處:任意時間點,最多隻能有固定數目的活動執行緒存在,此時如果有新的執行緒要建立,只能放在另外的佇列中等待,直到當前的執行緒中某個執行緒終止直接被移出池子。cache池和fixed池呼叫的是同一個底層池,只不過引數不同:fixed池執行緒數固定,並且是0秒IDLE(無IDLE)。所以FixedThreadPool多數針對一些很穩定很固定的正規併發執行緒,多用於伺服器。cache池執行緒數支援0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE。
ScheduledThreadPool 排程型執行緒池。這個池子裡的執行緒可以按schedule依次delay執行,或週期執行。
SingleThreadExecutor 單例執行緒,任意時間池中只能有一個執行緒。用的是和cache池和fixed池相同的底層池,但執行緒數目是1-1,0秒IDLE(無IDLE)。

2、Executors建立執行緒池原始碼

//呼叫newCachedThreadPool方法,可以建立一個緩衝型執行緒池,而在改方法中通過傳參建立一個ThreadPoolExecutor,我麼你會很奇怪明明返回的是一個ExecutorService,怎麼會建立了一個ThreadPoolExecutor呢?
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, 
                   TimeUnit.SECONDS, new SynchronousQueue<Runnable());
}

// ThreadPoolExecutor繼承了抽象的service類AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {}

//AbstractExecutorService實現了ExecutorService介面
public abstract class AbstractExecutorService implements ExecutorService {}

//所以ExecutorService其實是ThreadPoolExecutor的基類,這也就解釋清楚了


3、ExecutorService(執行緒池)

     ExecutorService是一個介面,它繼承了Executor,在原有execute方法的基礎上新增了submit方法,傳入一個任務,該方法能夠返回一個Future物件,可以獲取非同步計算結果。

//ExecutorService繼承了Executor,並擴充套件了新方法。
public interface ExecutorService extends Executor { }

//Executor中的方法
void execute(Runnable command);

//增加了submit方法,該方法傳任務來獲取Future物件,而Future物件中可以獲取任務的執行結果
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);


4、Future(獲取非同步計算結果)

     Future介面中有下表所示方法,可以獲取當前正在執行的任務相關資訊。

方法 說明
boolean cancel(boolean interruptIf) 取消任務的執行
boolean isCancelled() 任務是否已取消,任務正常完成前將其取消,返回 true
boolean isDone() 任務是否已完成,任務正常終止、異常或取消,返回true
V get() 等待任務結束,然後獲取V型別的結果
V get(long timeout, TimeUnit unit) 獲取結果,設定超時時間

5、FutureTask

   Executor框架利用FutureTask來完成非同步任務,並可以用來進行任何潛在的耗時的計算。一般FutureTask多用於耗時的計算,主執行緒可以在完成自己的任務後,再去獲取結果。

   FutureTask包裝了Callable和Runnable介面物件,提供對Future介面的基本實現,開始、取消計算、查詢計算是否完成、獲取計算結果。僅當計算完成時才能檢索結果,當計算沒有完成時,該方法會一直阻塞直到任務轉入完成狀態。一旦完成計算,不能夠重新開始或取消計算。通過Excutor(執行緒池)來執行,也可傳遞給Thread物件執行。如果在主執行緒中需要執行比較耗時的操作時,但又不想阻塞主執行緒時,可以把這些作業交給Future物件在後臺完成,當主執行緒將來需要時,就可以通過Future物件獲得後臺作業的計算結果或者執行狀態。

//通過傳入任務來構造FutureTask
public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}

//FutureTask中同樣有獲取當前任務狀態的方法
public boolean isCancelled(){}
public boolean isDone() {}
public boolean cancel(boolean mayInterruptIfRunning) {}

//FutureTask實現RunnableFuture
public class FutureTask<V> implements RunnableFuture<V> {}

//RunnableFuture繼承Runnable和Future
public interface RunnableFuture<V> extends Runnable, Future<V> 


6、示例程式碼

package future.java;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestFuture {
    // 建立執行緒池
    final static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Long t1 = System.currentTimeMillis();

        // 任務1
        Future<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return true;
            }
        });

        while (true) {
            if (booleanTask.isDone() && !booleanTask.isCancelled()) {
                //模擬耗時
                Thread.sleep(500);
                Boolean result = booleanTask.get();
                System.err.println("BooleanTask: " + result);
                break;
            }
        }

        // 任務2
        Future<String> stringTask = service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello World";
            }
        });

        while (true) {
            if (stringTask.isDone() && !stringTask.isCancelled()) {
                String result = stringTask.get();
                System.err.println("StringTask: " + result);
                break;
            }
        }



        // 任務3
        Future<Integer> integerTask = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });

        while (true) {
            if (integerTask.isDone() && !integerTask.isCancelled()) {
                Integer result = integerTask.get();
                System.err.println("IntegerTask: " + result);
                break;
            }
        }

        // 執行時間
        System.err.println("time: " + (System.currentTimeMillis() - t1));
    }

}