1. 程式人生 > 其它 >Java Callable and Future學習

Java Callable and Future學習

在本教程中,我們將學習Callable and Future。

Callable

在先前的教程中,我們使用一個Runnable物件來定義線上程內執行的任務。儘管使用定義任務Runnable非常方便,但是由於任務無法返回結果而受到限制。

如果要從任務中返回結果怎麼辦?

好吧,Java提供了一個Callable介面來定義返回結果的任務。ACallable類似於,Runnable除了它可以返回結果並引發檢查的異常。

Callable介面具有單個方法call(),該方法旨在包含執行緒執行的程式碼。這是一個簡單的Callable的示例-

Callable<String> callable = new Callable<String>() {
    @Override
    public String call() throws Exception {
        // Perform some computation
        Thread.sleep(2000);
        return "Return some result";
    }
};

請注意,使用時Callable,您不需要Thread.sleep()被try / catch塊包圍,因為與Runnable不同,Callable可以引發已檢查的異常。

您也可以像這樣將lambda表示式與Callable一起使用-

Callable<String> callable = () -> {
    // Perform some computation
    Thread.sleep(2000);
    return "Return some result";
};

使用ExecutorService執行可呼叫任務,並使用Future獲得結果

就像一樣Runnable

,您可以將提交Callable給執行者服務以執行。但是Callable的結果呢?您如何訪問它?

submit()執行程式服務的方法將任務提交給執行緒執行。但是,它不知道所提交任務的結果何時可用。因此,它返回一種稱為a的特殊型別的值,該值Future可用於在任務可用時獲取任務的結果。

的概念Future類似於Java等其他語言中的Promise。它表示將在以後的較晚時間點完成的計算結果。

以下是Future和Callable的簡單示例-

import java.util.concurrent.*;

public class FutureAndCallableExample {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newSingleThreadExecutor();

		Callable<String> callable = () -> {
			// Perform some computation
			System.out.println("Entered Callable");
			Thread.sleep(2000);
			return "Hello from Callable";
		};

		System.out.println("Submitting Callable");
		Future<String> future = executorService.submit(callable);

		// This line executes immediately
		System.out.println("Do something else while callable is getting executed");

		System.out.println("Retrieve the result of the future");
		// Future.get() blocks until the result is available
		String result = future.get();
		System.out.println(result);

		executorService.shutdown();
	}

}
# Output
Submitting Callable
Do something else while callable is getting executed
Retrieve the result of the future
Entered Callable
Hello from Callable

ExecutorService.submit()方法立即返回並給您Future。一旦獲得了未來,就可以在執行提交的任務時並行執行其他任務,然後使用future.get()方法來檢索未來的結果。

請注意,該get()方法將阻塞直到任務完成。該FutureAPI還提供了一個isDone()方法來檢查任務是否完成或不-

import java.util.concurrent.*;

public class FutureIsDoneExample {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newSingleThreadExecutor();

		Future<String> future = executorService.submit(() -> {
			Thread.sleep(2000);
			return "Hello from Callable";
		});

		while (!future.isDone()) {
			System.out.println("Task is still not done...");
			Thread.sleep(200);
		}

		System.out.println("Task completed! Retrieving the result");
		String result = future.get();
		System.out.println(result);

		executorService.shutdown();
	}
}
# Output
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task completed! Retrieving the result
Hello from Callable

取消未來

您可以取消將來的使用Future.cancel()方法。它嘗試取消任務的執行,如果成功取消,則返回true,否則返回false。

cancel()方法接受布林引數-mayInterruptIfRunning。如果傳遞true此引數的值,則當前正在執行任務的執行緒將被中斷,否則將允許正在進行的任務完成。

您可以使用isCancelled()方法來檢查任務是否被取消。同樣,取消任務後,isDone()將始終為真。

import java.util.concurrent.*;

public class FutureCancelExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        long startTime = System.nanoTime();
        Future<String> future = executorService.submit(() -> {
            Thread.sleep(2000);
            return "Hello from Callable";
        });

        while(!future.isDone()) {
            System.out.println("Task is still not done...");
            Thread.sleep(200);
            double elapsedTimeInSec = (System.nanoTime() - startTime)/1000000000.0;

            if(elapsedTimeInSec > 1) {
                future.cancel(true);
            }
        }

        System.out.println("Task completed! Retrieving the result");
        String result = future.get();
        System.out.println(result);

        executorService.shutdown();
    }
}
# Output
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task completed! Retrieving the result
Exception in thread "main" java.util.concurrent.CancellationException
        at java.util.concurrent.FutureTask.report(FutureTask.java:121)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at FutureCancelExample.main(FutureCancelExample.java:34)

如果執行上面的程式,它將丟擲異常,因為如果取消任務,future.get()方法將丟擲異常CancellationException。我們可以通過在獲取結果之前檢查是否取消了期貨來處理這一事實-

if(!future.isCancelled()) {
    System.out.println("Task completed! Retrieving the result");
    String result = future.get();
    System.out.println(result);
} else {
    System.out.println("Task was cancelled");
}

新增超時

future.get()方法將阻止並等待任務完成。如果您在可呼叫任務中從遠端服務呼叫API,並且遠端服務已關閉,則future.get()它將永遠阻塞,這將使應用程式無響應。

為了避免這種情況,您可以在get()方法中新增超時-

future.get(1, TimeUnit.SECONDS);

如果任務沒有在指定的時間內完成,則該future.get()方法將丟擲TimeoutException

invokeAll

提交多個任務,等待所有任務完成。

您可以通過將一組Callables傳遞給invokeAll()方法來執行多個任務。在invokeAll()返回的Future列表。future.get()在所有期貨都完成之前,對的任何呼叫都將阻塞。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class InvokeAllExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        Callable<String> task1 = () -> {
            Thread.sleep(2000);
            return "Result of Task1";
        };

        Callable<String> task2 = () -> {
            Thread.sleep(1000);
            return "Result of Task2";
        };

        Callable<String> task3 = () -> {
            Thread.sleep(5000);
            return "Result of Task3";
        };

        List<Callable<String>> taskList = Arrays.asList(task1, task2, task3);

        List<Future<String>> futures = executorService.invokeAll(taskList);

        for(Future<String> future: futures) {
            // The result is printed only after all the futures are complete. (i.e. after 5 seconds)
            System.out.println(future.get());
        }

        executorService.shutdown();
    }
}
# Output
Result of Task1
Result of Task2
Result of Task3

在上述程式中,對future.get()語句的首次呼叫會阻塞,直到所有期貨均完成。即結果將在5秒鐘後打印出來。

invokeAny

提交多個任務,等待其中任何一個完成

invokeAny()方法接受的集合Callables並返回最快的Callable的結果。請注意,它不會返回Future。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class InvokeAnyExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        Callable<String> task1 = () -> {
            Thread.sleep(2000);
            return "Result of Task1";
        };

        Callable<String> task2 = () -> {
            Thread.sleep(1000);
            return "Result of Task2";
        };

        Callable<String> task3 = () -> {
            Thread.sleep(5000);
            return "Result of Task3";
        };

        // Returns the result of the fastest callable. (task2 in this case)
        String result = executorService.invokeAny(Arrays.asList(task1, task2, task3));

        System.out.println(result);

        executorService.shutdown();
    }
}
# Output
Result of Task2

結論

您可以在我的github儲存庫中找到本教程中使用的所有程式碼段。我鼓勵您分叉儲存庫並自己練習程式。