Java使用Executor執行Callable任務時的幾種方法
阿新 • • 發佈:2018-11-29
多執行緒在需要返回值時,我們知道需要用到Callable和Future。Callable的cell方法可以返回一個值並且可丟擲異常,是對Runnable的很好的補充;Future表示了一個任務的週期,它提供了判斷任務狀態、獲取任務結果和取消任務等方法 。
下面演示三種使用Executor執行Callable任務的方法。
/** * 測試任務,返回任務的序號 */ public static class TestTask implements Callable<Integer>{ int index; public TestTask(int index) { this.index = index; } @Override public Integer call() throws Exception { return index; } } /** * 方法一:手動的儲存任務的返回,這樣的好處是每個任務對應的結果我們很清楚 */ @Test public void ordinaryTest() throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); List<Future<Integer>> futures = new ArrayList<>(); for(int i = 0; i < 10; i++) { TestTask testTask = new TestTask(i); Future<Integer> future = es.submit(testTask); futures.add(future); } es.shutdown(); for(int i = 0; i < 10; i++) { System.out.println("index:" + i + ",future:"+ futures.get(i).get()); } } /** * 方法二:使用ExecutorCompletionService * ExecutorCompletionService中使用阻塞佇列儲存各任務的返回結果,返回是無序的,即誰先執行完成(異常、中斷),誰先入隊。 * 當我們不關心結果的順序,或者需要一個任務完成時就取消其他任務的情況下,它是非常的方便的 */ @Test public void completionServiceTest() throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); CompletionService<Integer> completionService = new ExecutorCompletionService<>(es); for(int i = 0; i < 10; i++) { TestTask testTask = new TestTask(i); completionService.submit(testTask); } es.shutdown(); for(int i = 0; i < 10; i++) { Future<Integer> future = completionService.take(); System.out.println("index:" + i + ",future:"+ future.get()); } } /** * 方法三:ExecutorService的invokeAll方法 * invokeAll方法入參為一組任務,返回一組Future,這兩個集合是有相同結構的, * 即它是按照入參的任務集合中迭代器的順序將所有的Future新增到返回的集合中,從而任務和Future在它們各自的集合中有著同樣的順序。 * 當我們需要任務和結果的對應關係時,使用invokeAll方法來代替第一種方法 */ @Test public void invokeAllTest() throws InterruptedException, ExecutionException { ExecutorService es = Executors.newFixedThreadPool(10); List<TestTask> tasks = new ArrayList<>(); for (int i = 0; i < 10; i++){ tasks.add(new TestTask(i)); } List<Future<Integer>> futures = es.invokeAll(tasks); es.shutdown(); for (int i = 0; i < futures.size(); i++){ System.out.println("index:" + i + ",future:"+ futures.get(i).get()); } }
下面看一下ExecutorCompletionService的原理:
ExecutorCompletionService是將Executor和BlockingQueue的功能融合在一起,可將Callbale任務提交給它來執行,然後我們就可以像佇列一樣使用take或poll來得到已經完成的任務結果。下面是原始碼分析:
/** *ExecutorCompletionService包含三個成員變數,最主要的是completionQueue,它的型別阻塞佇列 */ private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; /** * 構造方法,需要我們傳入一個Executor */ public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } /** * 提交任務的方法,其中的RunnableFuture為一個內部類,繼承自FutureTask */ public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } /** * 這是QueueingFuture存在的主要原因,當任務執行完成後,將任務結果裝入佇列中 */ protected void done() { completionQueue.add(task); } private final Future<V> task; } /** * 從佇列中獲取返回值 */ public Future<V> take() throws InterruptedException { return completionQueue.take(); }