Java實現非同步回撥
public interface CallBack { /* 為什麼要寫這個回撥介面呢? *因為可能不止主調A需要用到被調的處理過程,如果很多地方需要用到被調程式 * 那麼傳入被調的方法就不可能只傳主調A類,所以要定義一個介面, * 傳入被調的處理方法的引數就是這個介面物件 * */ public void solve(String result); }主調程式:
public class CallbackRequest implements Callback{ private CallbackResponse callbackResponse; public CallbackRequest(CallbackResponse callbackResponse) { this.callbackResponse = callbackResponse; } //主調需要解決一個問題,所以他把問題交給被調處理,被調單獨建立一個執行緒,不影響主調程式的執行 public void request(final String question){ System.out.println("主調程式問了一個問題"); new Thread(()->{ //B想要幫A處理東西,就必須知道誰讓自己處理的,所以要傳入a,也要知道a想處理什麼,所以要傳入question callbackResponse.handler(this, question); }).start(); //A把要處理的事情交給b之後,就可以自己去玩耍了,或者去處理其他事情 afterAsk(); } private void afterAsk(){ System.out.println("主調程式繼續處理其他事情"); } @Override public void solve(String result) { System.out.println("被調程式接到答案後進行處理" + result); } }
被調程式:
public class CallbackResponse { public void handler(Callback callback, String request) { System.out.println(callback.getClass()+"問的問題是:"+ request); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } String result="\n答案是2"; callback.solve(result); } }
public class CallbackTest { public static void main(String[] args) { CallbackResponse callbackResponse = new CallbackResponse(); CallbackRequest callbackRequest = new CallbackRequest(callbackResponse); callbackRequest.request("1+1"); } }輸出: 主調程式問了一個問題 主調程式繼續處理其他事情 class javapratice.CallbackRequest問的問題是:1+1 被調程式接到答案後進行處理 答案是2 3、非同步回撥 非同步回撥的實現依賴於多執行緒或者多程序。軟體模組之間總是存在著一定的介面,從呼叫方式上,可以把他們分為三類:同步呼叫、回撥和非同步呼叫。同步呼叫是一種阻塞式呼叫,呼叫方要等待對方執行完畢才返回,它是一種單向呼叫;回撥是一種雙向呼叫模式,也就是說,被呼叫方在介面被呼叫時也會呼叫對方的介面;非同步呼叫是一種類似訊息或事件的機制,不過它的呼叫方向剛好相反,介面的服務在收到某種訊息或發生某種事件時,會主動通知客戶方(即呼叫客戶方的介面)。回撥和非同步呼叫的關係非常緊密,通常我們使用回撥來實現非同步訊息的註冊,通過非同步呼叫來實現訊息的通知。 3.1、多執行緒中的“回撥” (JDK8之前) Java多執行緒中可以通過callable和future或futuretask結合來獲取執行緒執行後的返回值。實現方法是通過get方法來呼叫callable的call方法獲取返回值。其實這種方法本質上不是回撥,回撥要求的是任務完成以後被呼叫者主動回撥呼叫者的介面,而這裡是呼叫者主動使用get方法阻塞獲取返回值。一般情況下,我們會結合Callable和Future一起使用,通過ExecutorService的submit方法執行Callable,並返回Future。
//多執行緒中的“回撥” public class CallBackMultiThread { //這裡簡單地使用future和callable實現了執行緒執行完後 public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); Future<String> future = executor.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("call"); TimeUnit.SECONDS.sleep(1); return "str"; } }); //手動阻塞呼叫get通過call方法獲得返回值。 System.out.println(future.get()); //需要手動關閉,不然執行緒池的執行緒會繼續執行。 executor.shutdown(); //使用futuretask同時作為執行緒執行單元和資料請求單元。 FutureTask<Integer> futureTask = new FutureTask(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("dasds"); return new Random().nextInt(); } }); new Thread(futureTask).start(); //阻塞獲取返回值 System.out.println(futureTask.get()); } }
注:比起future.get(),其實更推薦使用get (long timeout, TimeUnit unit)方法,設定了超時時間可以防止程式無限制的等待future的結果。
3.2、Java8中新增的CompletableFuture CompletableFuture在Java裡面被用於非同步程式設計,非同步通常意味著非阻塞,可以使得我們的任務單獨執行在與主執行緒分離的其他執行緒中,並且通過回撥可以在主執行緒中得到非同步任務的執行狀態,是否完成,和是否異常等資訊。CompletableFuture實現了Future, CompletionStage介面,實現了Future介面就可以相容現在有執行緒池框架,而CompletionStage接口才是非同步程式設計的介面抽象,裡面定義多種非同步方法,通過這兩者集合,從而打造出了強大的CompletableFuture類。 Future vs CompletableFuture Futrue在Java裡面,通常用來表示一個非同步任務的引用,比如我們將任務提交到執行緒池裡面,然後我們會得到一個Futrue,在Future裡面有isDone方法來 判斷任務是否處理結束,還有get方法可以一直阻塞直到任務結束然後獲取結果,但整體來說這種方式,還是同步的,因為需要客戶端不斷阻塞等待或者不斷輪詢才能知道任務是否完成。 但是Future的主要缺點如下:- 不支援手動完成:這個意思指的是,我提交了一個任務,但是執行太慢了,我通過其他路徑已經獲取到了任務結果,現在沒法把這個任務結果,通知到正在執行的執行緒,所以必須主動取消或者一直等待它執行完成。
- 不支援進一步的非阻塞呼叫:這個指的是我們通過Future的get方法會一直阻塞到任務完成,但是我還想在獲取任務之後,執行額外的任務,因為Future不支援回撥函式,所以無法實現這個功能。
- 不支援鏈式呼叫:這個指的是對於Future的執行結果,我們想繼續傳到下一個Future處理使用,從而形成一個鏈式的pipline呼叫,這在Future中是沒法實現的。
- 不支援多個Future合併:比如我們有10個Future並行執行,我們想在所有的Future執行完畢之後,執行某些函式,是沒法通過Future實現的。
- 不支援異常處理:Future的API沒有任何的異常處理的api,所以在非同步執行時,如果出了問題是不好定位的。
public class TestCompletableFuture { public static void main(String[] args) throws Exception{ CompletableFuture<String> completableFuture=new CompletableFuture<String>(); Runnable runnable=new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+" 執行....."); completableFuture.complete("success");//在子執行緒中完成主執行緒completableFuture的完成 } catch (InterruptedException e) { e.printStackTrace(); } } }; Thread t1=new Thread(runnable); t1.start();//啟動子執行緒 String result=completableFuture.get();//主執行緒阻塞,等待完成 System.out.println(Thread.currentThread().getName()+" 1x: "+result); } } 輸出結果: Thread-0 執行..... main 1x: success
2、執行一個簡單的沒有返回值的非同步任務
public class TestCompletableFuture { public static void main(String[] args) throws Exception{ CompletableFuture<Void> future=CompletableFuture.runAsync(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"正在執行一個沒有返回值的非同步任務。"); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }); future.get(); System.out.println(Thread.currentThread().getName()+" 結束。"); } } 輸出如下: ForkJoinPool.commonPool-worker-1正在執行一個沒有返回值的非同步任務。 main 結束。
從上面程式碼我們可以看到CompletableFuture預設執行使用的是ForkJoin的的執行緒池。當然,你也可以用lambda表示式使得程式碼更精簡。
3,執行一個有返回值的非同步任務public class TestCompletableFuture { public static void main(String[] args) throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>(){ @Override public String get() { try { System.out.println(Thread.currentThread().getName()+"正在執行一個有返回值的非同步任務。"); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "OK"; } }); String result=future.get(); System.out.println(Thread.currentThread().getName()+" 結果:"+result); } } 輸出結果: ForkJoinPool.commonPool-worker-1正在執行一個有返回值的非同步任務。 main 結果:OK
當然,上面預設的都是ForkJoinPool我們也可以換成Executor相關的Pool,其api都有支援如下:
static CompletableFuture<Void> runAsync(Runnable runnable) static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
2)高階使用CompletableFuture 上面提到的幾種使用方法是使用非同步程式設計最簡單的步驟,CompletableFuture.get()的方法會阻塞直到任務完成,這其實還是同步的概念,這對於一個非同步系統是不夠的,因為真正的非同步是需要支援回撥函式,這樣以來,我們就可以直接在某個任務幹完之後,接著執行回撥裡面的函式,從而做到真正的非同步概念。在CompletableFuture裡面,通過thenApply(), thenAccept(),thenRun()方法,來執行一個回撥函式。 1、thenApply() 這個方法,其實用過函數語言程式設計的人非常容易理解,類似於scala和spark的map運算元,通過這個方法可以進行多次鏈式轉化並返回最終的加工結果。 看下面一個例子:
public class TestCompletableFuture { public static void asyncCallback() throws ExecutionException, InterruptedException { CompletableFuture<String> task=CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println("執行緒" + Thread.currentThread().getName() + " supplyAsync"); return "123"; } }); CompletableFuture<Integer> result1 = task.thenApply(number->{ System.out.println("執行緒" + Thread.currentThread().getName() + " thenApply1 "); return Integer.parseInt(number); }); CompletableFuture<Integer> result2 = result1.thenApply(number->{ System.out.println("執行緒" + Thread.currentThread().getName() + " thenApply2 "); return number*2; }); System.out.println("執行緒" + Thread.currentThread().getName()+" => "+result2.get()); } public static void main(String[] args) throws Exception{ asyncCallback(); } } 輸出結果: 執行緒ForkJoinPool.commonPool-worker-1 supplyAsync 執行緒main thenApply1 執行緒main thenApply2 執行緒main => 246
2、thenAccept()
這個方法,可以接受Futrue的一個返回值,但是本身不在返回任何值,適合用於多個callback函式的最後一步操作使用。例子如下:public class TestCompletableFuture { public static void asyncCallback() throws ExecutionException, InterruptedException { CompletableFuture<String> task=CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println(Thread.currentThread().getName()+" supplyAsync"); return "123"; } }); CompletableFuture<Integer> chain1 = task.thenApply(number->{ System.out.println(Thread.currentThread().getName()+" thenApply1"); return Integer.parseInt(number); }); CompletableFuture<Integer> chain2 = chain1.thenApply(number->{ System.out.println(Thread.currentThread().getName()+" thenApply2"); return number*2; }); CompletableFuture<Void> result=chain2.thenAccept(product->{ System.out.println(Thread.currentThread().getName()+" thenAccept="+product); }); result.get(); System.out.println(Thread.currentThread().getName()+" end"); } public static void main(String[] args) throws Exception { asyncCallback(); } } 結果如下: ForkJoinPool.commonPool-worker-1 supplyAsync main thenApply1 main thenApply2 main thenAccept=246 main end3、thenRun() 這個方法與上一個方法類似,一般也用於回撥函式最後的執行,但這個方法不接受回撥函式的返回值,純粹就代表執行任務的最後一個步驟:
public class TestCompletableFuture { public static void asyncCallback() throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"supplyAsync: 一階段任務"); return null; }).thenRun(()->{ System.out.println(Thread.currentThread().getName()+"thenRun: 收尾任務"); }).get(); } public static void main(String[] args) throws Exception { asyncCallback(); } } 結果: ForkJoinPool.commonPool-worker-1supplyAsync: 一階段任務 mainthenRun: 收尾任務
這裡注意,截止到目前,前面的例子程式碼只會涉及兩個執行緒,一個是主執行緒一個是ForkJoinPool池的執行緒,但其實上面的每一步都是支援非同步執行的,其api如下:
// thenApply() variants <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
我們看下改造後的一個例子:
public class TestCompletableFuture { public static void asyncCallback() throws ExecutionException, InterruptedException { CompletableFuture<String> ref1= CompletableFuture.supplyAsync(()->{ try { System.out.println(Thread.currentThread().getName() + " supplyAsync開始執行任務1.... "); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " supplyAsync: 任務1"); return null; }); CompletableFuture<String> ref2= CompletableFuture.supplyAsync(()->{ try { } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " thenApplyAsync: 任務2"); return null; }); CompletableFuture<String> ref3=ref2.thenApplyAsync(value->{ System.out.println(Thread.currentThread().getName() +" thenApplyAsync: 任務2的子任務"); return " finish"; }); Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + ref3.get()); } public static void main(String[] args) throws Exception { asyncCallback(); } } 輸出結果如下: ForkJoinPool.commonPool-worker-1 supplyAsync開始執行任務1.... ForkJoinPool.commonPool-worker-2 thenApplyAsync: 任務2 ForkJoinPool.commonPool-worker-2 thenApplyAsync: 任務2的子任務 ForkJoinPool.commonPool-worker-1 supplyAsync: 任務1 main finish
我們可以看到,ForkJoin池的執行緒1,執行了前面的三個任務,但是第二個任務的子任務,因為我們了使用也非同步提交所以它用的執行緒是ForkJoin池的執行緒2,最終由於main執行緒處執行了get是最後結束的。
還有一點需要注意: ForkJoinPool所有的工作執行緒都是守護模式的,也就是說如果主執行緒退出,那麼整個處理任務都會結束,而不管你當前的任務是否執行完。如果需要主執行緒等待結束,可採用ExecutorsThreadPool,如下:ExecutorService pool = Executors.newFixedThreadPool(5); final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { ... }, pool);
4、thenCompose():合併兩個有依賴關係的CompletableFutures的執行結果
CompletableFutures在執行兩個依賴的任務合併時,會返回一個巢狀的結果列表,為了避免這種情況我們可以使用thenCompose來返回,直接獲取最頂層的結果資料即可:public class TestCompletableFuture { public static void asyncCompose() throws ExecutionException, InterruptedException { CompletableFuture<String> future1=CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { return "1"; } }); CompletableFuture<String>nestedResult = future1.thenCompose(value-> CompletableFuture.supplyAsync(()->{ return value+"2"; })); System.out.println(nestedResult.get()); } public static void main(String[] args) throws Exception { asyncCompose(); } } 輸出結果:125、thenCombine:組合兩個沒有依賴關係的CompletableFutures任務
public class TestCompletableFuture { public static void asyncCombine() throws ExecutionException, InterruptedException { CompletableFuture<Double> d1= CompletableFuture.supplyAsync(new Supplier<Double>() { @Override public Double get() { return 1d; } }); CompletableFuture<Double> d2= CompletableFuture.supplyAsync(new Supplier<Double>() { @Override public Double get() { return 2d; } }); CompletableFuture<Double> result= d1.thenCombine(d2,(number1,number2)->{ return number1+number2; }); System.out.println(result.get()); } public static void main(String[] args) throws Exception { asyncCombine(); } } 輸出結果:3d
6、合併多個任務的結果allOf與anyOf
上面說的是兩個任務的合併,那麼多個任務需要使用allOf或者anyOf方法。allOf適用於,你有一系列獨立的future任務,你想等其所有的任務執行完後做一些事情。舉個例子,比如我想下載100個網頁,傳統的序列,效能肯定不行,這裡我們採用非同步模式,同時對100個網頁進行下載,當所有的任務下載完成之後,我們想判斷每個網頁是否包含某個關鍵詞。 下面我們通過隨機數來模擬上面的這個場景如下:public class TestCompletableFuture { public static void mutilTaskTest() throws ExecutionException, InterruptedException { //新增n個任務 CompletableFuture<Double> array[]=new CompletableFuture[3]; for ( int i = 0; i < 3; i++) { array[i]=CompletableFuture.supplyAsync(new Supplier<Double>() { @Override public Double get() { return Math.random(); } }); } //獲取結果的方式一 // CompletableFuture.allOf(array).get(); // for(CompletableFuture<Double> cf:array){ // if(cf.get()>0.6){ // System.out.println(cf.get()); // } // } //獲取結果的方式二,過濾大於指定數字,在收集輸出 List<Double> rs= Stream.of(array).map(CompletableFuture::join).filter(number->number>0.6).collect(Collectors.toList()); System.out.println(rs); } public static void main(String[] args) throws Exception { mutilTaskTest(); } } 結果如下(結果可能不一致): [0.85538057702618, 0.7692532053269862, 0.6441387373310598]
注意其中的join方法和get方法類似,僅僅在於在Future不能正常完成的時候丟擲一個unchecked的exception,這可以確保它用在Stream的map方法中,直接使用get是沒法在map裡面執行的。
anyOf方法,也比較簡單,意思就是隻要在多個future裡面有一個返回,整個任務就可以結束,而不需要等到每一個future結束。public class TestCompletableFuture { public static void mutilTaskTest() throws ExecutionException, InterruptedException { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } return "wait 4 seconds"; } }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "wait 2 seconds"; } }); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } return "wait 10 seconds"; } }); CompletableFuture<Object> result = CompletableFuture.anyOf(f1, f2, f3); System.out.println(result.get()); } public static void main(String[] args) throws Exception { mutilTaskTest(); } } 輸出結果: wait 2 seconds
注意由於Anyof返回的是其中任意一個Future所以這裡沒有明確的返回型別,統一使用Object接受,留給使用端處理。
7、exceptionally異常處理 異常處理是非同步計算的一個重要環節,下面看看如何在CompletableFuture中使用:public class TestCompletableFuture { public static void exceptionProcess() throws ExecutionException, InterruptedException { int age=-1; CompletableFuture<String> task= CompletableFuture.supplyAsync(new Supplier<String>(){ @Override public String get(){ if(age<0){ throw new IllegalArgumentException("性別必須大於0"); } if(age<18){ return "未成年人"; } return "成年人"; } }).exceptionally(ex->{ System.out.println(ex.getMessage()); return "發生 異常"+ex.getMessage(); }); System.out.println(task.get()); } public static void main(String[] args) throws Exception { exceptionProcess(); } } 結果如下: java.lang.IllegalArgumentException: 性別必須大於0 發生 異常java.lang.IllegalArgumentException: 性別必須大於0
此外還有另外一種異常捕捉方法handle,無論發生異常都會執行,示例如下:
public class TestCompletableFuture { public static void exceptionProcess() throws ExecutionException, InterruptedException { int age = -10; CompletableFuture<String> task= CompletableFuture.supplyAsync(new Supplier<String>(){ @Override public String get(){ if(age<0){ throw new IllegalArgumentException("性別必須大於0"); } if(age<18){ return "未成年人"; } return "成年人"; } }).handle((res,ex)->{ System.out.println("執行handle"); if(ex!=null){ System.out.println("發生異常"); return "發生 異常"+ex.getMessage(); } return res; }); System.out.println(task.get()); } public static void main(String[] args) throws Exception { exceptionProcess(); } } 輸出結果: 執行handle 發生異常 發生 異常java.lang.IllegalArgumentException: 性別必須大於0
注意上面的方法如果正常執行,也會執行handle方法。
3.3、JDK9 CompletableFuture 類增強的主要內容 (1)支援對非同步方法的超時呼叫- orTimeout()
- completeOnTimeout()
- Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
- Executor delayedExecutor(long delay, TimeUnit unit)