基於介面回撥詳解JUC中Callable和FutureTask實現原理
Callable介面和FutureTask實現類,是JUC(Java Util Concurrent)包中很重要的兩個技術實現,它們使獲取多執行緒執行結果成為可能。它們底層的實現,就是基於介面回撥技術。介面回撥,許多程式設計師都耳熟能詳,這種技術被廣泛應用於非同步模組的開發中。它的實現原理並不複雜,但是對初學者來說卻並不友好,其中的一個原因是它的使用場景和處理手段,對習慣了單執行緒開發的初學者來說有點繞。而各種文章或書籍,在解釋這一個問題的時候,往往忽視了使用場景,而舉一些小明坐車、A和B等等的例子,初學者看完之後往往更迷糊。
本文立足於此,就從多執行緒中執行緒結果獲取這一需求場景出發,逐步說明介面回撥及其在JUC中的應用。
執行緒結果獲取
習慣了單執行緒開發的程式設計師,在非同步程式設計中最難理解的一點,就是如何從執行緒執行結果返回資訊,因為run和start方法本身是沒有返回值的。一個基本的方法是,使用一個變數暫存執行結果,另外提供一個公共方法來返回這個變數。實現程式碼如下:
1 /* 2 * 設計可以返回執行結果的執行緒 3 * 定義一個執行緒讀取檔案內容, 使用字串存取結果並返回主執行緒 4 */ 5 public class ReturnDigestTest extends Thread{ 6 //定義檔名 7 private String fileName; 8 //定義一個字串物件result, 用於存取執行緒執行結果 9 private String result; 10 11 public ReturnDigestTest(String fileName) { 12 this.fileName = fileName; 13 } 14 //run方法中讀取本目錄下檔案, 並存儲至result 15 @Override 16 public void run() { 17 try (FileInputStream fis = new FileInputStream(fileName)){18 byte[] buffer = new byte[1024]; 19 int hasRead = 0; 20 while ((hasRead = fis.read(buffer)) > 0) { 21 result = new String(buffer, 0, hasRead); 22 } 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 } 27 //定義返回result結果的方法 28 public String getResult() { 29 return result; 30 } 31 public static void main(String[] args) throws InterruptedException { 32 //測試, 在子執行緒中執行讀取檔案, 主執行緒返回 33 ReturnDigestTest returnDigestTest = new ReturnDigestTest("test.txt"); 34 returnDigestTest.start(); 35 //以下結果返回null. 因為getResult方法執行的時候, 子執行緒可能還沒結束 36 System.out.println(returnDigestTest.getResult()); 37 } 38 }
執行結果會輸出一個null,原因在於讀取檔案的執行緒需要執行時間,所以很可能到主執行緒呼叫getResult方法的時候,子執行緒還沒結束,結果就為null了。
如果在上面程式碼第35行,增加TimeUnit.SECONDS.sleep(5); 使主執行緒休眠5秒鐘,你會發現結果正確返回。
競態條件
在多執行緒環境下的實際開發場景中,更為常見的情形是,業務執行緒需要不斷迴圈獲取多個執行緒執行的返回結果。如果按照上述思路開發,那可能的結果為null,也可能導致程式掛起。上述方法是否成功,取決於競態條件(Race Condition),包括執行緒數、CPU數量、CPU運算速度、磁碟讀取速度、JVM執行緒排程演算法。
輪詢
作為對上述方法的一個優化,可以讓主執行緒定期詢問返回狀態,直到結果非空在進行獲取,這就是輪詢的思路。沿用上面的例子,只需要把36行修改如下即可:
1 //使用輪詢, 判斷執行緒返回結果是否為null 2 while (true) { 3 if (returnDigestTest.getResult() != null) { 4 System.out.println(returnDigestTest.getResult()); 5 break; 6 } 7 }
但是,這個方法仍然不具有普適性,在有些JVM,主執行緒會佔用幾乎所有執行時間,而導致子執行緒無法完成工作。
即便不考慮這個因素,這個方法仍然不理想,它使得CPU執行時間被額外佔用了。就好像一個搭公交的小孩,每一站都在問:請問到站了嗎?因此,比較理想的方法,是讓子執行緒在它完成任務後,通知主執行緒,這就是回撥方法。
介面回撥的應用
在非同步程式設計中,回撥的意思是,一個執行緒在執行中或完畢後,通知另外一個執行緒,返回一些訊息。而介面回撥,則是充分利用了Java多型的特徵,使用介面作為回撥方法的引用。
使用介面回撥技術來優化上面的問題,可以設計一個實現Runnable介面的類,一個回撥方法的介面,以及一個回撥方法介面的實現類(main方法所在類),具體實現如下
實現Runnable的類
1 /* 2 * 使用介面回撥, 實現執行緒執行結果的返回 3 */ 4 public class CallbackDigest implements Runnable{ 5 private String fileName; 6 private String result; 7 //定義回撥方法介面的引用 8 private CallbackUserInterface cui; 9 public CallbackDigest(String fileName, CallbackUserInterface cui) { 10 this.fileName = fileName; 11 this.cui = cui; 12 } 13 @Override 14 public void run() { 15 try (FileInputStream fis = new FileInputStream(fileName)){ 16 byte[] buffer = new byte[1024]; 17 int hasRead = 0; 18 while((hasRead = fis.read(buffer)) > 0) { 19 result = new String(buffer, 0, hasRead); 20 } 21 //通過回撥介面引用, 呼叫了receiveResult方法, 可以在主執行緒中返回結果. 22 //此處利用了多型 23 cui.receiveResult(result, fileName); 24 } catch (IOException e) { 25 e.printStackTrace(); 26 } 27 } 28 }
回撥方法介面
1 public interface CallbackUserInterface { 2 //只定義了回撥方法, 傳入一個待讀取的檔名引數, 和返回結果 3 public void receiveResult(String result, String fileName); 4 }
回撥方法介面實現類
1 public class CallbackTest implements CallbackUserInterface { 2 //實現回撥方法 3 @Override 4 public void receiveResult(String result, String fileName) { 5 System.out.println("檔案" + fileName + "的內容是: \n" + result); 6 } 7 8 public static void main(String[] args) { 9 //新建回撥介面引用, 指向實現類的物件 10 CallbackUserInterface test = new CallbackTest(); 11 new Thread(new CallbackDigest("test.txt", test)).start(); 12 } 13 }
介面回撥的技術主要有4個關鍵點:
1. 發出資訊的執行緒類:定義回撥方法介面的引用,在構造方法中初始化。
2. 發出資訊的執行緒類:使用回撥方法介面的引用, 來呼叫回撥方法。
3. 收取資訊的執行緒類:實現回撥介面,新建回撥介面的引用,指向該類的物件。
4. 發出資訊的執行緒類:新建執行緒類物件是,傳入3中新建的實現類物件。
Callable和FutureTask的使用
Callable的底層實現類似於一個回撥介面,而FutureTask類似於本例子中讀取檔案內容的執行緒實現類。因為FutureTask實現了Runnable介面,所以它的實現類是可以多執行緒的,而內部就是呼叫了Callable介面實現類的回撥方法,從而實現執行緒結果的返回機制。demo程式碼如下:
1 public class TestCallable implements Callable<Integer>{ 2 //實現Callable並重寫call方法作為執行緒執行體, 並設定返回值1 3 @Override 4 public Integer call() throws Exception { 5 System.out.println("Thread is running..."); 6 Thread.sleep(3000); 7 return 1; 8 } 9 10 public static void main(String[] args) throws InterruptedException, ExecutionException { 11 //建立Callable實現類的物件 12 TestCallable tc = new TestCallable(); 13 //建立FutureTask類的物件 14 FutureTask<Integer> task = new FutureTask<>(tc); 15 //把FutureTask實現類物件作為target,通過Thread類物件啟動執行緒 16 new Thread(task).start(); 17 System.out.println("do something else..."); 18 //通過get方法獲取返回值 19 Integer integer = task.get(); 20 System.out.println("The thread running result is :" + integer); 21 } 22 }