Callable和並行設計模式——Future模式
目錄
在Java中,建立執行緒一般有兩種方式,一種是繼承Thread類,一種是實現Runnable介面。然而,這兩種方式的缺點是線上程任務執行結束後,無法獲取執行結果。一般只能採用共享變數或共享儲存區以及執行緒通訊的方式實現獲得任務結果,也可以使用回撥的方法:
package com.thread.myfuture; interface Callable { void call(int num); } public class FutureTest { public static void main(String[] args) { System.out.println("主執行緒開始"); final Callable callable = new Callable() { @Override public void call(int num) { System.out.println("執行緒執行結果值 num=="+num); } }; new Thread(new Runnable() { @Override public void run() { System.out.println("執行緒"+Thread.currentThread().getName()+" 開始"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } callable.call(100); System.out.println("執行緒"+Thread.currentThread().getName()+" 結束"); } }, "t1").start(); System.out.println("主執行緒結束"); } }
結果:
使用回撥方法的缺點:
- 必須要建立回撥介面。而且執行緒執行中可能產生異常,那麼回撥介面至少包含成功回撥和錯誤回撥兩個方法。
- 當執行緒執行後,只能等到執行緒回撥介面,本身我們沒有辦法進行取消操作。
- 如果要重複獲取同樣執行緒執行結果的值,還是隻能重新執行執行緒。當然你也可以使用一個變數快取結果值。
一、Callable
1.1 介紹
可以使用Callable和Future來實現獲取任務結果的操作。Callable用來執行任務,產生結果,而Future用來獲得結果。
Callable介面與Runnable介面是否相似,檢視原始碼,可知Callable介面的定義如下:
可以看到,與Runnable介面不同之處在於,call方法帶有泛型返回值V。
1.2 Future常用方法
V get() :獲取非同步執行的結果,如果沒有結果可用,此方法會阻塞直到非同步計算完成。
V get(Long timeout , TimeUnit unit) :獲取非同步執行結果,如果沒有結果可用,此方法會阻塞,但是會有時間限制,如果阻塞時間超過設定的timeout時間,該方法將丟擲異常。
boolean isDone() :如果任務執行結束,無論是正常結束或是中途取消還是發生異常,都返回true。
boolean isCanceller() :如果任務完成前被取消,則返回true。
boolean cancel(boolean mayInterruptRunning) :
如果任務還沒開始,執行cancel(...)方法將返回false;
如果任務已經啟動,執行cancel(true)方法將以中斷執行此任務執行緒的方式來試圖停止任務,如果停止成功,返回true;
當任務已經啟動,執行cancel(false)方法將不會對正在執行的任務執行緒產生影響(讓執行緒正常執行到完成),此時返回false;
當任務已經完成,執行cancel(...)方法將返回false。mayInterruptRunning引數表示是否中斷執行中的執行緒。
實際上Future提供了3種功能:
(1)能夠中斷執行中的任務
(2)判斷任務是否執行完成
(3)獲取任務執行完成後的結果。
1.3 Demo
package com.thread.pool;
import java.util.concurrent.*;
/**
* @Author: 98050
* @Time: 2018-12-07 21:05
* @Feature: Callable
*/
public class Test007 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
System.out.println("主執行緒開始執行");
final Future<String> submit = executorService.submit(new TaskCallable());
String result = submit.get();
System.out.println("執行結果:"+result);
executorService.shutdown();
}
}
class TaskCallable implements Callable<String>{
public String call() throws Exception {
System.out.println("正在執行任務,需要等待五秒,開始執行");
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+i);
Thread.sleep(1000);
}
System.out.println("執行結束");
return "success";
}
}
結果:
1.4 Runnable和Callable的區別
(1)Callable規定的方法是call(),Runnable規定的方法是run().
(2)Callable的任務執行後可返回值,而Runnable的任務是不能返回值
(3)call方法可以丟擲異常,run方法不可以
(4)執行Callable任務可以拿到一個Future物件,Future 表示非同步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。計算完成後只能使用 get 方法來獲取結果,如果執行緒沒有執行完,Future.get()方法可能會阻塞當前執行緒的執行;如果執行緒出現異常,Future.get()會throws InterruptedException或者ExecutionException;如果執行緒已經取消,會跑出CancellationException。取消由cancel 方法來執行。isDone確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果為了可取消性而使用 Future 但又不提供可用的結果,則可以宣告Future<?> 形式型別、並返回 null 作為底層任務的結果。
二、Future模式
2.1 介紹
Future模式的核心在於:去除了主函式的等待時間,並使得原本需要等待的時間段可以用於處理其他業務邏輯
Futrure模式:對於多執行緒,如果執行緒A要等待執行緒B的結果,那麼執行緒A沒必要等待B,直到B有結果,可以先拿到一個未來的Future,等B有結果是再取真實的結果。
例子:網路圖片的下載。剛開始是通過模糊的圖片來代替最後的圖片,等下載圖片的執行緒下載完圖片後在替換。而在這個過程中可以做一些其他的事情。
首先客戶端向伺服器請求RealSubject,但是這個資源的建立是非常耗時的,怎麼辦呢?這種情況下,首先返回Client一個FutureSubject,以滿足客戶端的需求,於此同時呢,Future會通過另外一個Thread 去構造一個真正的資源,資源準備完畢之後,在給future一個通知。如果客戶端急於獲取這個真正的資源,那麼就會阻塞客戶端的其他所有執行緒,等待資源準備完畢。
2.2 實現
參 與 者 |
作 用 |
Main | 系統啟動,呼叫Client發出請求 |
Client | 返回Data物件,立即返回FutureData,並開啟ClientThread執行緒裝配RealData |
Data | 返回資料的介面 |
FutureData | Future資料,構造很快,但是是一個虛擬的資料,需要裝配RealData |
RealData | 真實資料,其構造是比較慢的 |
2.2.1 公共資料介面
package com.thread.myfuture;
/**
* @Author: 98050
* @Time: 2018-12-07 23:05
* @Feature:
*/
public interface Data {
/**
* 獲取請求
* @return
*/
String getRequest();
}
2.2.2 RealData
在這裡面實現正在的資料讀取,然後儲存在result中進行返回。
package com.thread.myfuture;
/**
* @Author: 98050
* @Time: 2018-12-07 23:07
* @Feature:
*/
public class RealData implements Data {
private String result;
public RealData(String data) throws InterruptedException {
System.out.println("正在使用data:"+data+"網路請求資料,耗時需要等待");
Thread.sleep(3000);
System.out.println("讀取資料成功,獲取結果。。。。。");
result = "success";
}
@Override
public String getRequest() {
return result;
}
}
2.2.3 FutureData
主要功能是當有執行緒想要獲取RealData的時候,程式就會被阻塞,直到RealData被注入後才能使用getRequest方法,注意在getRequest方法中,通過realDara.getRequest()來獲取真正的資料。
package com.thread.myfuture;
/**
* @Author: 98050
* @Time: 2018-12-07 23:06
* @Feature:
*/
public class FutureData implements Data {
public volatile static boolean FLAG = false;
private RealData realData;
public synchronized void setRealData(RealData realData){
//如果已經獲取到結果,直接返回
if (FLAG){
return;
}
//如果沒有獲取到資料,傳遞真實物件
this.realData = realData;
FLAG = true;
notify();
}
@Override
public synchronized String getRequest() {
while (!FLAG){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//獲取到資料,直接返回
return realData.getRequest();
}
}
2.2.4 FutureClient
主要作用就是往futureData中注入realData。
package com.thread.myfuture;
/**
* @Author: 98050
* @Time: 2018-12-07 23:15
* @Feature:
*/
public class FutureClient {
public Data request(final String queryStr){
final FutureData futureData = new FutureData();
new Thread(new Runnable() {
public void run() {
try {
RealData realData = new RealData(queryStr);
futureData.setRealData(realData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
return futureData;
}
}
2.2.5 呼叫者
package com.thread.myfuture;
/**
* @Author: 98050
* @Time: 2018-12-07 23:17
* @Feature:
*/
public class Test {
public static void main(String[] args) {
FutureClient futureClient = new FutureClient();
final Data request = futureClient.request("請求引數........");
System.out.println("請求傳送成功!");
String result = request.getRequest();
System.out.println("獲取到的結果:"+result);
System.out.println("繼續執行其它任務");
}
}
2.2.6 結果
三、Java內建Future模式實現
由於Future是非常常用的多執行緒設計模式,因此在JDK中內建了Future模式的實現。這些類在java.util.concurrent包裡面。其中最為重要的是FutureTask類,它實現了Runnable介面,作為單獨的執行緒執行。在其run()方法中,通過Sync內部類呼叫Callable介面,並維護Callable介面的返回物件。當使用FutureTask.get()方法時,將返回Callable介面的返回物件。其核心結構圖如下所示:
3.1 修改RealData
實現Callable介面,實現具體的業務邏輯。
package com.thread.future;
import java.util.concurrent.Callable;
/**
* @Author: 98050
* @Time: 2018-12-07 23:07
* @Feature:
*/
public class RealData implements Callable<String> {
private String data;
public RealData(String data) {
this.data = data;
}
@Override
public String call() throws Exception {
System.out.println("正在使用:"+data+"........請求資料,耗時等待中");
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+i);
Thread.sleep(1000);
}
return "success";
}
}
3.2 修改Main方法
由於使用了JDK的內建框架,Data、FutureData等物件就不再需要了。在Main方法的實現中,直接通過RealData構造FutureTask,並將其作為單獨的執行緒執行。在提交請求後,執行其他業務邏輯,最後通過FutureTask.get()方法,得到RealData的執行結果。
package com.thread.future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
/**
* @Author: 98050
* @Time: 2018-12-07 23:17
* @Feature:
*/
public class Test {
public static void main(String[] args) {
FutureTask<String> futureTask = new FutureTask<String>(new RealData("引數"));
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(futureTask);
System.out.println("請求完畢");
try {
//執行其它工作
System.out.println("開始執行其它任務,耗時3秒");
for (int i = 0; i < 3; i++) {
System.out.println(Thread.currentThread().getName()+i);
Thread.sleep(1000);
}
System.out.println("請求結果:"+futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
}