1. 程式人生 > >執行器中分離任務載入和結果處理

執行器中分離任務載入和結果處理

Java 9併發程式設計指南 目錄

執行器中分離任務載入和結果處理

通常情況,當使用執行器執行併發任務時,會發送Runnable或Callable任務到執行器並且獲得Future物件來控制方法。會出現需要在一個物件中傳送任務到執行器,且在另一個物件中處理結果的局面。Java併發API提供的CompletionService類用來應對這種狀況的發生。

CompletionService類提供傳送任務到執行器的方法,以及為已經完成執行的下一個任務獲得Future物件。此類內部使用Executor物件執行任務,這鐘特性的優點是共享CompletionService物件且傳送任務到執行器以便其他任務能夠處理結果。不足之處則是第二個物件只能為已經完成執行的任務獲得Future物件,所以這些Future物件只能用來得到任務結果。

本節中,學習如何使用CompletionService類將執行器中載入任務過程與任務結果的處理分開。

準備工作

本範例通過Eclipse開發工具實現。如果使用諸如NetBeans的開發工具,開啟並建立一個新的Java專案。

實現過程

通過如下步驟完成範例:

  1. 建立名為ReportGenerator的類,實現String類引數化的Callable介面:

    public class ReportGenerator implements Callable<String>{
    
  2. 定義名為sender和title的兩個私有String屬性,用來定義報告內容:

    	private final String sender;
    	private final String title;
    
  3. 實現類建構函式,初始化兩個屬性:

    	public ReportGenerator(String sender, String title) {
    		this.sender = sender;
    		this.title = title;
    	}
    
  4. 實現call()方法。首先,設定執行緒隨機休眠一段時間:

    	@Override
    	public String call() throws Exception {
    		try{
    			Long duration = (
    long)(Math.random() * 10); System.out.printf("%s_%s : ReportGenerator : Generating a report during %d seconds\n", this.sender, this.title, duration); TimeUnit.SECONDS.sleep(duration); } catch(InterruptedException e){ e.printStackTrace(); }
  5. 然後用sender和title屬性內容生成字串的報告內容,並返回內容:

    		String ret = sender + " :  " + title;
    		return ret;
    	}
    
  6. 建立名為ReportRequest的類,實現Runnable介面,用來模擬一些報告請求:

    public class ReportRequest implements Runnable{
    
  7. 定義名為name的私有String屬性,儲存ReportRequest的名稱:

    	private final String name;
    
  8. 定義名為service的私有CompletionService屬性,CompletionService介面是String類引數化的介面:

    	private final CompletionService<String> service;
    
  9. 實現類建構函式,初始化兩個屬性:

    	public ReportRequest(String name, CompletionService<String> service) {
    		this.name = name;
    		this.service = service;
    	}
    
  10. 實現run()方法,建立三個ReportGenerator物件,使用submit()方法傳送到CompletionService物件:

    	@Override
    	public void run() {
    		ReportGenerator reportGenerator = new ReportGenerator(name, "Report");
    		service.submit(reportGenerator);
    	}
    
  11. 建立名為ReportProcessor的類,此類將得到ReportGenerator任務的結果,指定其實現Runnable介面:

    public class ReportProcessor implements Runnable{
    
  12. 定義名為service的私有CompletionService屬性,因為CompletionService介面是引數化介面,所以使用String類作為CompletionService介面的引數:

    	private final CompletionService<String> service;
    
  13. 定義名為end的私有Boolean屬性,新增volatile 關鍵字確保所有執行緒擁有此屬性實際值的使用權:

    	private volatile boolean end;
    
  14. 定義類建構函式,初始化這兩個屬性:

    	public ReportProcessor(CompletionService<String> service) {
    		this.service = service;
    		end = false;
    	}
    
  15. 實現run()方法,當end屬性是false時,呼叫CompletionService介面的poll()方法,獲得下一個任務的Future物件,此任務被已經結束的完成服務執行:

    	@Override
    	public void run() {
    		while (!end){
    			try {
    				Future<String> result = service.poll(20, TimeUnit.SECONDS);
    
  16. 然後,使用Future物件的get()方法得到任務結果,並輸出結果到控制檯:

    				if(result != null) {
    					String report = result.get();
    					System.out.printf("ReportReceiver : Report Received : %s\n", report);
    				}
    			} catch (InterruptedException | ExecutionException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}	
    		}
    		System.out.printf("ReportSender : End\n");
    	}
    
  17. 實現stopProcessing()方法來改變end屬性值:

    	public void stopProcessing(){
    		this.end = true;
    	}
    
  18. 實現範例的主方法,建立一個包含main()方法的Main類:

    public class Main {
    	public static void main(String[] args) {
    
  19. 使用Executors類的newCachedThreadPool()方法建立ThreadPoolExecutor:

    		ExecutorService executor = Executors.newCachedThreadPool();
    
  20. 使用執行器建立CompletionService,此執行器作為前面的建構函式的引數被建立:

    		CompletionService<String> service = new ExecutorCompletionService<>(executor);
    
  21. 建立兩個ReportRequest物件,以及執行它們的執行緒:

    		ReportRequest faceRequest = new ReportRequest("Face", service);
    		ReportRequest onlineRequest = new ReportRequest("Online", service);
    		
    		Thread faceThread = new Thread(faceRequest);
    		Thread onlineThread = new Thread(onlineRequest);
    
  22. 建立ReportProcessor物件,以及執行它的執行緒:

    		ReportProcessor processor = new ReportProcessor(service);
    		Thread senderThread = new Thread(processor);
    
  23. 啟動三個執行緒:

    		System.out.printf("Main : Starting the Threads\n");
    		faceThread.start();
    		onlineThread.start();
    		senderThread.start();
    
  24. 等待ReportRequest執行緒的結束:

    		try {
    			System.out.printf("Main : Waiting for the report generators.\n");
    			faceThread.join();
    			onlineThread.join();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    
  25. 使用shutdown()方法結束執行器,然後使用awaitTermination()方法等待任務的結束:

    		System.out.printf("Main : Shutting down the executor.\n");
    		executor.shutdown();
    		try {
    			executor.awaitTermination(1, TimeUnit.DAYS);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    
  26. 結束ReportSender物件的執行,設定end屬性值為true:

    		processor.stopProcessing();
    		System.out.println("Main : Ends");
    

工作原理

本範例主類中,使用Executors類的newCachedThreadPool()方法建立ThreadPoolExecutor。然後,用到Executor物件初始化CompletionService物件,因為完成服務使用執行器執行其任務。為了使用完成服務執行任務,用到ReportRequest類中的submit()方法。

當完成服務結束其執行時,其中一個任務被執行,服務儲存Future物件用來在佇列中控制執行。poll()方法進入佇列判斷是否有任務已經完成執行,如果有,則返回佇列的第一個元素,即任務已經完成執行的Future物件。當poll()方法返回Future物件時,此物件從佇列中刪除自己。這種情況下,假定佇列包含的完成任務的結果為空,則傳遞兩個屬性到此方法,指明想要等待任務完成的時間。

一旦CompletionService物件被建立,則建立了兩個ReportRequest物件用來執行一個ReportGenerator任務,使用之前建立的CompletionService物件執行另一個ReportGenerator任務,此物件作為引數傳遞到ReportGenerator物件的建構函式中。

擴充套件學習

CompletionService類能夠執行Callable或者Runnable任務,本範例中用到了Callable,但是也可以傳送Runnable物件。由於Runnable物件不產生結果,所以CompletionService類設計也沒有考慮到此情形。

這個類提供了兩個方法獲得完成任務的Future物件,如下所示:

  • poll():不帶引數的poll()方法判斷佇列中是否有Future物件。如果佇列為空,立即返回空,否則,返回第一個元素並從佇列中刪除它。
  • take():此方法不帶引數,判斷佇列中是否有Future物件。如果佇列為空,則阻塞執行緒直到佇列具有一個元素,如果佇列包含元素,此方法返回第一個元素,並且從佇列中刪除它。

本範例中,超時使用poll()方法來控制結束ReportProcessor任務的執行。

更多關注

本章“執行器中執行返回結果的任務“小節。