1. 程式人生 > >執行緒執行者(十一)執行者分離任務的啟動和結果的處理

執行緒執行者(十一)執行者分離任務的啟動和結果的處理

宣告:本文是《 Java 7 Concurrency Cookbook 》的第四章,作者: Javier Fernández González     譯者:許巧輝     校對:方騰飛,葉磊

執行者分離任務的啟動和結果的處理

通常,當你使用執行者執行併發任務時,你將會提交 Runnable或Callable任務給這個執行者,並獲取Future物件控制這個方法。你可以發現這種情況,你需要提交任務給執行者在一個物件中,而處理結果在另一個物件中。基於這種情況,Java併發API提供CompletionService類。

CompletionService 類有一個方法來提交任務給執行者和另一個方法來獲取已完成執行的下個任務的Future物件。在內部實現中,它使用Executor物件執行任務。這種行為的優點是共享一個CompletionService物件,並提交任務給執行者,這樣其他(物件)可以處理結果。其侷限性是,第二個物件只能獲取那些已經完成它們的執行的任務的Future物件,所以,這些Future物件只能獲取任務的結果。

在這個指南中,你將學習如何使用CompletionService類把執行者啟動任務和處理它們的結果分開。

準備工作…

這個指南的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,開啟它並建立一個新的Java專案。

如何做…

按以下步驟來實現的這個例子:

1.建立ReportGenerator類,並指定其實現Callable介面,引數化為String型別。

public class ReportGenerator implements Callable<String> {

2.宣告兩個私有的、String型別的屬性,sender和title,用來表示報告的資料。

private String sender;
private String title;

3.實現這個類的構造器,初始化這兩個屬性。

public ReportGenerator(String sender, String title){
this.sender=sender;
this.title=title;
}

4.實現call()方法。首先,讓執行緒睡眠一段隨機時間。

@Override
public String call() throws Exception {
try {
Long duration=(long)(Math.random()*10);
System.out.printf("%s_%s: ReportGenerator: Generating a report during %d seconds\n",this.sender,this.title,duration);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}

5.然後,生成一個有sender和title屬性的字串的報告,返回這個字串。

String ret=sender+": "+title;
return ret;
}

6.建立ReportRequest類,實現Runnable介面。這個類將模擬一些報告請求。

public class ReportRequest implements Runnable {

7.宣告私有的、String型別的屬性name,用來儲存ReportRequest的名稱。

private String name;

8.宣告私有的、CompletionService型別的屬性service。CompletionService介面是個引數化介面,使用String型別引數化它。

private CompletionService<String> service;

9.實現這個類的構造器,初始化這兩個屬性。

public ReportRequest(String name, CompletionService<String> service){
this.name=name;
this.service=service;
}

10.實現run()方法。建立1個ReportGenerator物件,並使用submit()方法把它提交給CompletionService物件。

@Override
public void run() {
ReportGenerator reportGenerator=new ReportGenerator(name,"Report");
service.submit(reportGenerator);
}

11.建立ReportProcessor類。這個類將獲取ReportGenerator任務的結果,指定它實現Runnable介面。

public class ReportProcessor implements Runnable {

12.宣告一個私有的、CompletionService型別的屬性service。由於CompletionService介面是個引數化介面,使用String類作為這個CompletionService介面的引數。

private CompletionService<String> service;

13.宣告一個私有的、boolean型別的屬性end。

private boolean end;

14.實現這個類的構造器,初始化這兩個屬性。

public ReportProcessor (CompletionService<String> service){
this.service=service;
end=false;
}

15.實現run()方法。當屬性end值為false,呼叫CompletionService介面的poll()方法,獲取CompletionService執行的下個已完成任務的Future物件。

@Override
public void run() {
while (!end){
try {
Future<String> result=service.poll(20, TimeUnit.SECONDS);

16.然後,使用Future物件的get()方法獲取任務的結果,並且將這些結果寫入到控制檯。

if (result!=null) {
String report=result.get();
System.out.printf("ReportReceiver: Report Received:%s\n",report);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
System.out.printf("ReportSender: End\n");
}

17.實現setEnd()方法,用來修改屬性end的值。

public void setEnd(boolean end) {
this.end = end;
}

18.實現這個示例的主類,通過建立Main類,並實現main()方法。

public class Main {
public static void main(String[] args) {

19.使用Executors類的newCachedThreadPool()方法建立ThreadPoolExecutor。

ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();

20.建立CompletionService,使用前面建立的執行者作為構造器的引數。

CompletionService<String> service=new ExecutorCompletionService<>(executor);

21.建立兩個ReportRequest物件,並用執行緒執行它們。

ReportRequest faceRequest=new ReportRequest("Face", service);
ReportRequest onlineRequest=new ReportRequest("Online";,service);
Thread faceThread=new Thread(faceRequest);
Thread onlineThread=new Thread(onlineRequest);

22.建立一個ReportProcessor物件,並用執行緒執行它。

ReportProcessor processor=new ReportProcessor(service);
Thread senderThread=new Thread(processor);

23.啟動這3個執行緒。

System.out.printf("Main: Starting the Threads\n");
faceThread.start();
onlineThread.start();
senderThread.start();

24.等待ReportRequest執行緒的結束。

try {
System.out.printf("Main: Waiting for the report
generators.\n");
faceThread.join();
onlineThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

25.使用shutdown()方法關閉執行者,使用awaitTermination()方法等待任務的結果。

System.out.printf("Main: Shutting down the executor.\n");
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}

26.設定ReportSender物件的end屬性值為true,結束它的執行。

processor.setEnd(true);
System.out.println("Main: Ends");

這是如何工作的…

在示例的主類中,你使用Executors類的newCachedThreadPool()方法建立ThreadPoolExecutor。然後,使用這個物件初始化一個CompletionService物件,因為CompletionService需要使用一個執行者來執行任務。利用CompletionService執行一個任務,你需要使用submit()方法,如在ReportRequest類中。

當其中一個任務被執行,CompletionService完成這個任務的執行時,這個CompletionService在一個佇列中儲存Future物件來控制它的執行。poll()方法用來檢視這個列隊,如果有任何任務執行完成,那麼返回列隊的第一個元素,它是一個已完成任務的Future物件。當poll()方法返回一個Future物件時,它將這個Future物件從佇列中刪除。這種情況下,你可以傳兩個屬性給那個方法,表明你想要等任務結果的時間,以防佇列中的已完成任務的結果是空的。

一旦CompletionService物件被建立,你建立2個ReportRequest物件,用來執行3個ReportGenerator任務,每個都在CompletionService中,和一個ReportSender任務,它將會處理已提交給2個ReportRequest物件的任務所產生的結果。

不止這些…

CompletionService類可以執行Callable和Runnable任務。在這個示例中,你已經使用Callable,但你同樣可以提交Runnable物件。由於Runnable物件不會產生結果,CompletionService類的理念不適用於這些情況。

這個類同樣提供其他兩個方法,用來獲取已完成任務的Future物件。這兩個方法如下:

  • poll():不帶引數版本的poll()方法,檢查是否有任何Future物件在佇列中。如果列隊是空的,它立即返回null。否則,它返回第一個元素,並從列隊中刪除它。
  • take():這個方法,不帶引數。檢查是否有任何Future物件在佇列中。如果佇列是空的,它阻塞執行緒直到佇列有一個元素。當佇列有元素,它返回第一元素,並從列隊中刪除它。

參見

  • 在第4章,執行緒執行者中的執行者執行返回結果的任務指南