執行器中分離任務載入和結果處理
執行器中分離任務載入和結果處理
通常情況,當使用執行器執行併發任務時,會發送Runnable或Callable任務到執行器並且獲得Future物件來控制方法。會出現需要在一個物件中傳送任務到執行器,且在另一個物件中處理結果的局面。Java併發API提供的CompletionService類用來應對這種狀況的發生。
CompletionService類提供傳送任務到執行器的方法,以及為已經完成執行的下一個任務獲得Future物件。此類內部使用Executor物件執行任務,這鐘特性的優點是共享CompletionService物件且傳送任務到執行器以便其他任務能夠處理結果。不足之處則是第二個物件只能為已經完成執行的任務獲得Future物件,所以這些Future物件只能用來得到任務結果。
本節中,學習如何使用CompletionService類將執行器中載入任務過程與任務結果的處理分開。
準備工作
本範例通過Eclipse開發工具實現。如果使用諸如NetBeans的開發工具,開啟並建立一個新的Java專案。
實現過程
通過如下步驟完成範例:
-
建立名為ReportGenerator的類,實現String類引數化的Callable介面:
public class ReportGenerator implements Callable<String>{
-
定義名為sender和title的兩個私有String屬性,用來定義報告內容:
private final String sender; private final String title;
-
實現類建構函式,初始化兩個屬性:
public ReportGenerator(String sender, String title) { this.sender = sender; this.title = title; }
-
實現call()方法。首先,設定執行緒隨機休眠一段時間:
@Override public String call() throws Exception { try{ Long duration = (
-
然後用sender和title屬性內容生成字串的報告內容,並返回內容:
String ret = sender + " : " + title; return ret; }
-
建立名為ReportRequest的類,實現Runnable介面,用來模擬一些報告請求:
public class ReportRequest implements Runnable{
-
定義名為name的私有String屬性,儲存ReportRequest的名稱:
private final String name;
-
定義名為service的私有CompletionService屬性,CompletionService介面是String類引數化的介面:
private final CompletionService<String> service;
-
實現類建構函式,初始化兩個屬性:
public ReportRequest(String name, CompletionService<String> service) { this.name = name; this.service = service; }
-
實現run()方法,建立三個ReportGenerator物件,使用submit()方法傳送到CompletionService物件:
@Override public void run() { ReportGenerator reportGenerator = new ReportGenerator(name, "Report"); service.submit(reportGenerator); }
-
建立名為ReportProcessor的類,此類將得到ReportGenerator任務的結果,指定其實現Runnable介面:
public class ReportProcessor implements Runnable{
-
定義名為service的私有CompletionService屬性,因為CompletionService介面是引數化介面,所以使用String類作為CompletionService介面的引數:
private final CompletionService<String> service;
-
定義名為end的私有Boolean屬性,新增volatile 關鍵字確保所有執行緒擁有此屬性實際值的使用權:
private volatile boolean end;
-
定義類建構函式,初始化這兩個屬性:
public ReportProcessor(CompletionService<String> service) { this.service = service; end = false; }
-
實現run()方法,當end屬性是false時,呼叫CompletionService介面的poll()方法,獲得下一個任務的Future物件,此任務被已經結束的完成服務執行:
@Override public void run() { while (!end){ try { Future<String> result = service.poll(20, TimeUnit.SECONDS);
-
然後,使用Future物件的get()方法得到任務結果,並輸出結果到控制檯:
if(result != null) { String report = result.get(); System.out.printf("ReportReceiver : Report Received : %s\n", report); } } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.printf("ReportSender : End\n"); }
-
實現stopProcessing()方法來改變end屬性值:
public void stopProcessing(){ this.end = true; }
-
實現範例的主方法,建立一個包含main()方法的Main類:
public class Main { public static void main(String[] args) {
-
使用Executors類的newCachedThreadPool()方法建立ThreadPoolExecutor:
ExecutorService executor = Executors.newCachedThreadPool();
-
使用執行器建立CompletionService,此執行器作為前面的建構函式的引數被建立:
CompletionService<String> service = new ExecutorCompletionService<>(executor);
-
建立兩個ReportRequest物件,以及執行它們的執行緒:
ReportRequest faceRequest = new ReportRequest("Face", service); ReportRequest onlineRequest = new ReportRequest("Online", service); Thread faceThread = new Thread(faceRequest); Thread onlineThread = new Thread(onlineRequest);
-
建立ReportProcessor物件,以及執行它的執行緒:
ReportProcessor processor = new ReportProcessor(service); Thread senderThread = new Thread(processor);
-
啟動三個執行緒:
System.out.printf("Main : Starting the Threads\n"); faceThread.start(); onlineThread.start(); senderThread.start();
-
等待ReportRequest執行緒的結束:
try { System.out.printf("Main : Waiting for the report generators.\n"); faceThread.join(); onlineThread.join(); } catch (InterruptedException e) { e.printStackTrace(); }
-
使用shutdown()方法結束執行器,然後使用awaitTermination()方法等待任務的結束:
System.out.printf("Main : Shutting down the executor.\n"); executor.shutdown(); try { executor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); }
-
結束ReportSender物件的執行,設定end屬性值為true:
processor.stopProcessing(); System.out.println("Main : Ends");
工作原理
本範例主類中,使用Executors類的newCachedThreadPool()方法建立ThreadPoolExecutor。然後,用到Executor物件初始化CompletionService物件,因為完成服務使用執行器執行其任務。為了使用完成服務執行任務,用到ReportRequest類中的submit()方法。
當完成服務結束其執行時,其中一個任務被執行,服務儲存Future物件用來在佇列中控制執行。poll()方法進入佇列判斷是否有任務已經完成執行,如果有,則返回佇列的第一個元素,即任務已經完成執行的Future物件。當poll()方法返回Future物件時,此物件從佇列中刪除自己。這種情況下,假定佇列包含的完成任務的結果為空,則傳遞兩個屬性到此方法,指明想要等待任務完成的時間。
一旦CompletionService物件被建立,則建立了兩個ReportRequest物件用來執行一個ReportGenerator任務,使用之前建立的CompletionService物件執行另一個ReportGenerator任務,此物件作為引數傳遞到ReportGenerator物件的建構函式中。
擴充套件學習
CompletionService類能夠執行Callable或者Runnable任務,本範例中用到了Callable,但是也可以傳送Runnable物件。由於Runnable物件不產生結果,所以CompletionService類設計也沒有考慮到此情形。
這個類提供了兩個方法獲得完成任務的Future物件,如下所示:
- poll():不帶引數的poll()方法判斷佇列中是否有Future物件。如果佇列為空,立即返回空,否則,返回第一個元素並從佇列中刪除它。
- take():此方法不帶引數,判斷佇列中是否有Future物件。如果佇列為空,則阻塞執行緒直到佇列具有一個元素,如果佇列包含元素,此方法返回第一個元素,並且從佇列中刪除它。
本範例中,超時使用poll()方法來控制結束ReportProcessor任務的執行。
更多關注
本章“執行器中執行返回結果的任務“小節。