深入學習理解(1):java:ExecutorService invokeAll 任務的批量提交invokeAll兩種方法的區別
阿新 • • 發佈:2019-01-29
ExecutorService的invokeAll方法有兩種用法:
1.exec.invokeAll(tasks)
2.exec.invokeAll(tasks, timeout, unit)
其中tasks是任務集合,timeout是超時時間,unit是時間單位
兩者都會堵塞,必須等待所有的任務執行完成後統一返回,一方面記憶體持有的時間長;另一方面響應性也有一定的影響,畢竟大家都喜歡看看刷刷的執行結果輸出,而不是苦苦的等待;
但是方法二增加了超時時間控制,這裡的超時時間是針對的所有tasks,而不是單個task的超時時間。如果超時,會取消沒有執行完的所有任務,並丟擲超時異常。相當於將每一個future的執行情況用一個list集合儲存,當呼叫future.get()方法取值時和設定的timeout比較,是否超時。
InvokeAll方法處理一個任務的容器(collection),並返回一個Future的容器。兩個容器具有相同的結構;
這裡提交的任務容器列表和返回的Future列表存在順序對應的關係。
invokeAll將Future新增到返回容器中,這樣可以使用任務容器的迭代器,從而呼叫者可以將它表現的Callable與Future關聯起來。
當所有任務都完成時、呼叫執行緒被中斷時或者超過時限時,限時版本的invokeAll都會返回結果。超過時限後,任何尚未完成的任務都會被取消。
作為invokeAll的返回值,每個任務要麼正常地完成,要麼被取消。
invokeAll控制批量任務的時間期限的例子:
package com.thread; import java.math.BigDecimal; import java.sql.Time; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * 批量任務的限時 invokeAll(tasks) 批量提交不限時任務 * * invokeAll(tasks, timeout, unit) 批量提交限時任務 * * InvokeAll方法處理一個任務的容器(collection),並返回一個Future的容器。兩個容器具有相同的結構: * invokeAll將Future新增到返回的容器中,這樣可以使用任務容器的迭代器,從而呼叫者可以將它表現的Callable與Future 關聯起來。 * 當所有任務都完成時、呼叫執行緒被中斷時或者超過時限時,限時版本的invokeAll都會返回結果。 超過時限後,任務尚未完成的任務都會被取消。 * * @author hadoop * */ public class InvokeAllThread { // 固定大小的執行緒池,同時只能接受5個任務 static ExecutorService mExecutor = Executors.newFixedThreadPool(5); /** * 計算價格的任務 * @author hadoop * */ private class QuoteTask implements Callable<BigDecimal> { public final double price; public final int num; public QuoteTask(double price, int num) { this.price = price; this.num = num; } @Override public BigDecimal call() throws Exception { Random r = new Random(); long time = (r.nextInt(10) + 1) * 1000; Thread.sleep(time); BigDecimal d = BigDecimal.valueOf(price * num).setScale(2); System.out.println("耗時:" + time / 1000 + "s,單價是:" + price + ",人數是:" + num + ",總額是:" + d); return d; } } /** * 在預定時間內請求獲得旅遊報價資訊 * * @return */ public void getRankedTravelQuotes() throws InterruptedException { List<QuoteTask> tasks = new ArrayList<QuoteTask>(); // 模擬10個計算旅遊報價的任務 for (int i = 1; i <= 20; i++) { tasks.add(new QuoteTask(200, i) ); } /** * 使用invokeAll方法批量提交限時任務任務 預期15s所有任務都執行完,沒有執行完的任務會自動取消 * */ List<Future<BigDecimal>> futures = mExecutor.invokeAll(tasks, 15, TimeUnit.SECONDS); // 報價合計集合 List<BigDecimal> totalPriceList = new ArrayList<BigDecimal>(); Iterator<QuoteTask> taskIter = tasks.iterator(); for (Future<BigDecimal> future : futures) { QuoteTask task = taskIter.next(); try { totalPriceList.add(future.get()); } catch (ExecutionException e) { // 返回計算失敗的原因 // totalPriceList.add(task.getFailureQuote(e.getCause())); totalPriceList.add(BigDecimal.valueOf(-1)); System.out.println("任務執行異常,單價是"+task.price+",人數是:"+task.num); } catch (CancellationException e) { // totalPriceList.add(task.getTimeoutQuote(e)); totalPriceList.add(BigDecimal.ZERO); System.out.println("任務超時,取消計算,單價是"+task.price+",人數是:"+task.num); } } for (BigDecimal bigDecimal : totalPriceList) { System.out.println(bigDecimal); } mExecutor.shutdown(); } public static void main(String[] args) { try { InvokeAllThread it = new InvokeAllThread(); it.getRankedTravelQuotes(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }