基於接口回調詳解JUC中Callable和FutureTask實現原理
Callable接口和FutureTask實現類,是JUC(Java Util Concurrent)包中很重要的兩個技術實現,它們使獲取多線程運行結果成為可能。它們底層的實現,就是基於接口回調技術。接口回調,許多程序員都耳熟能詳,這種技術被廣泛應用於異步模塊的開發中。它的實現原理並不復雜,但是對初學者來說卻並不友好,其中的一個原因是它的使用場景和處理手段,對習慣了單線程開發的初學者來說有點繞。而各種文章或書籍,在解釋這一個問題的時候,往往忽視了使用場景,而舉一些小明坐車、A和B等等的例子,初學者看完之後往往更迷糊。
本文立足於此,就從多線程中線程結果獲取這一需求場景出發,逐步說明接口回調及其在JUC中的應用。
需要了解Java多線程的底層運行機制,可以看這一篇:基於JVM原理、JMM模型和CPU緩存模型深入理解Java並發編程
線程結果獲取
習慣了單線程開發的程序員,在異步編程中最難理解的一點,就是如何從線程運行結果返回信息,因為run和start方法本身是沒有返回值的。一個基本的方法是,使用一個變量暫存運行結果,另外提供一個公共方法來返回這個變量。實現代碼如下:
1 /* 2 * 設計可以返回運行結果的線程 3 * 定義一個線程讀取文件內容, 使用字符串存取結果並返回主線程 4 */ 5 public class ReturnDigestTest extends Thread{ 6//定義文件名 7 private String fileName; 8 //定義一個字符串對象result, 用於存取線程執行結果 9 private String result; 10 11 public ReturnDigestTest(String fileName) { 12 this.fileName = fileName; 13 } 14 //run方法中讀取本目錄下文件, 並存儲至result 15 @Override 16 public void run() { 17 try(FileInputStream fis = new FileInputStream(fileName)){ 18 byte[] buffer = new byte[1024]; 19 int hasRead = 0; 20 while ((hasRead = fis.read(buffer)) > 0) { 21 result = new String(buffer, 0, hasRead); 22 } 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 } 27 //定義返回result結果的方法 28 public String getResult() { 29 return result; 30 } 31 public static void main(String[] args) throws InterruptedException { 32 //測試, 在子線程中執行讀取文件, 主線程返回 33 ReturnDigestTest returnDigestTest = new ReturnDigestTest("test.txt"); 34 returnDigestTest.start(); 35 //以下結果返回null. 因為getResult方法執行的時候, 子線程可能還沒結束 36 System.out.println(returnDigestTest.getResult()); 37 } 38 }
運行結果會輸出一個null,原因在於讀取文件的線程需要執行時間,所以很可能到主線程調用getResult方法的時候,子線程還沒結束,結果就為null了。
如果在上面代碼第35行,增加TimeUnit.SECONDS.sleep(5); 使主線程休眠5秒鐘,你會發現結果正確返回。
競態條件
在多線程環境下的實際開發場景中,更為常見的情形是,業務線程需要不斷循環獲取多個線程運行的返回結果。如果按照上述思路開發,那可能的結果為null,也可能導致程序掛起。上述方法是否成功,取決於競態條件(Race Condition),包括線程數、CPU數量、CPU運算速度、磁盤讀取速度、JVM線程調度算法。
輪詢
作為對上述方法的一個優化,可以讓主線程定期詢問返回狀態,直到結果非空在進行獲取,這就是輪詢的思路。沿用上面的例子,只需要把36行修改如下即可:
1 //使用輪詢, 判斷線程返回結果是否為null 2 while (true) { 3 if (returnDigestTest.getResult() != null) { 4 System.out.println(returnDigestTest.getResult()); 5 break; 6 } 7 }
但是,這個方法仍然不具有普適性,在有些JVM,主線程會占用幾乎所有運行時間,而導致子線程無法完成工作。
即便不考慮這個因素,這個方法仍然不理想,它使得CPU運行時間被額外占用了。就好像一個搭公交的小孩,每一站都在問:請問到站了嗎?因此,比較理想的方法,是讓子線程在它完成任務後,通知主線程,這就是回調方法。
接口回調的應用
在異步編程中,回調的意思是,一個線程在執行中或完畢後,通知另外一個線程,返回一些消息。而接口回調,則是充分利用了Java多態的特征,使用接口作為回調方法的引用。
使用接口回調技術來優化上面的問題,可以設計一個實現Runnable接口的類,一個回調方法的接口,以及一個回調方法接口的實現類(main方法所在類),具體實現如下
實現Runnable的類
1 /* 2 * 使用接口回調, 實現線程執行結果的返回 3 */ 4 public class CallbackDigest implements Runnable{ 5 private String fileName; 6 private String result; 7 //定義回調方法接口的引用 8 private CallbackUserInterface cui; 9 public CallbackDigest(String fileName, CallbackUserInterface cui) { 10 this.fileName = fileName; 11 this.cui = cui; 12 } 13 @Override 14 public void run() { 15 try (FileInputStream fis = new FileInputStream(fileName)){ 16 byte[] buffer = new byte[1024]; 17 int hasRead = 0; 18 while((hasRead = fis.read(buffer)) > 0) { 19 result = new String(buffer, 0, hasRead); 20 } 21 //通過回調接口引用, 調用了receiveResult方法, 可以在主線程中返回結果. 22 //此處利用了多態 23 cui.receiveResult(result, fileName); 24 } catch (IOException e) { 25 e.printStackTrace(); 26 } 27 } 28 }
回調方法接口
1 public interface CallbackUserInterface { 2 //只定義了回調方法, 傳入一個待讀取的文件名參數, 和返回結果 3 public void receiveResult(String result, String fileName); 4 }
回調方法接口實現類
1 public class CallbackTest implements CallbackUserInterface { 2 //實現回調方法 3 @Override 4 public void receiveResult(String result, String fileName) { 5 System.out.println("文件" + fileName + "的內容是: \n" + result); 6 } 7 8 public static void main(String[] args) { 9 //新建回調接口引用, 指向實現類的對象 10 CallbackUserInterface test = new CallbackTest(); 11 new Thread(new CallbackDigest("test.txt", test)).start(); 12 } 13 }
接口回調的技術主要有4個關鍵點:
1. 發出信息的線程類:定義回調方法接口的引用,在構造方法中初始化。
2. 發出信息的線程類:使用回調方法接口的引用, 來調用回調方法。
3. 收取信息的線程類:實現回調接口,新建回調接口的引用,指向該類的對象。
4. 發出信息的線程類:新建線程類對象是,傳入3中新建的實現類對象。
Callable和FutureTask的使用
Callable的底層實現類似於一個回調接口,而FutureTask類似於本例子中讀取文件內容的線程實現類。因為FutureTask實現了Runnable接口,所以它的實現類是可以多線程的,而內部就是調用了Callable接口實現類的回調方法,從而實現線程結果的返回機制。demo代碼如下:
1 public class TestCallable implements Callable<Integer>{ 2 //實現Callable並重寫call方法作為線程執行體, 並設置返回值1 3 @Override 4 public Integer call() throws Exception { 5 System.out.println("Thread is running..."); 6 Thread.sleep(3000); 7 return 1; 8 } 9 10 public static void main(String[] args) throws InterruptedException, ExecutionException { 11 //創建Callable實現類的對象 12 TestCallable tc = new TestCallable(); 13 //創建FutureTask類的對象 14 FutureTask<Integer> task = new FutureTask<>(tc); 15 //把FutureTask實現類對象作為target,通過Thread類對象啟動線程 16 new Thread(task).start(); 17 System.out.println("do something else..."); 18 //通過get方法獲取返回值 19 Integer integer = task.get(); 20 System.out.println("The thread running result is :" + integer); 21 } 22 }
基於接口回調詳解JUC中Callable和FutureTask實現原理