1. 程式人生 > >Java併發框架:Executor

Java併發框架:Executor

介紹

隨著當今處理器中可用的核心數量的增加, 隨著對實現更高吞吐量的需求的不斷增長,多執行緒 API 變得非常流行。 Java 提供了自己的多執行緒框架,稱為 Executor 框架.

1. Executor 框架是什麼?

Executor 框架包含一組用於有效管理工作執行緒的元件。Executor API 通過 Executors 將任務的執行與要執行的實際任務解耦。 這是 生產者-消費者 模式的一種實現。

java.util.concurrent.Executors 提供了用於建立工作執行緒的執行緒池

的工廠方法。

為了使用 Executor 框架,我們需要建立一個執行緒池並提交任務給它以供執行。Executor 框架的工作是排程和執行已提交的任務並從執行緒池中拿到返回的結果。

浮現於腦海中的一個基本的問題是,當我們建立 java.lang.Thread 的物件或呼叫實現了 Runnable/Callable 介面來達到的程式的並行性時,為什麼需要執行緒池?

答案來源於兩個基本面:

  1. 為新任務建立新的執行緒會存在額外的執行緒建立以及銷燬的開銷。管理這些執行緒的生命週期會明顯增加 CPU 的執行時間。
  2. 不進行任何限制地為每個程序建立執行緒會導致建立大量執行緒。這些執行緒會佔用大量記憶體並引起資源的浪費。當一個執行緒利用完 CPU 的時間片後另一個執行緒即將利用CPU的時間片時,CPU 會花費大量的時間來切換執行緒的上下文。

所有的這些因素都會導致系統的吞吐量下降。執行緒池通過保持執行緒一直存活並重用這些執行緒來克服這個問題。當提交到執行緒池中的任務多於正在執行的執行緒時,那些多餘的任務將被放到佇列中。 一旦執行任務的執行緒有空閒的了,它們會從佇列中取下一個任務來執行。對於 JDK 提供的現成的 executors 此任務佇列基本是無界的。

2. Executors 的型別

現在我們已經瞭解了 executors 是什麼, 讓我們來看看不同型別的 executors。

2.1 SingleThreadExecutor

此執行緒池 executor 只有一個執行緒。它用於以順序方式的形式執行任務。如果此執行緒在執行任務時因異常而掛掉,則會建立一個新執行緒來替換此執行緒,後續任務將在新執行緒中執行。

ExecutorService executorService = Executors.newSingleThreadExecutor()

2.2 FixedThreadPool(n)

顧名思義,它是一個擁有固定數量執行緒的執行緒池。提交給 executor 的任務由固定的 n 個執行緒執行,如果有更多的任務,它們儲存在 LinkedBlockingQueue 裡。這個數字 n 通常跟底層處理器支援的執行緒總數有關。

ExecutorService executorService = Executors.newFixedThreadPool(4);

2.3 CachedThreadPool

該執行緒池主要用於執行大量短期並行任務的場景。與固定執行緒池不同,此執行緒池的執行緒數不受限制。如果所有的執行緒都在忙於執行任務並且又有新的任務到來了,這個執行緒池將建立一個新的執行緒並將其提交到 executor。只要其中一個執行緒變為空閒,它就會執行新的任務。 如果一個執行緒有 60 秒的時間都是空閒的,它們將被結束生命週期並從快取中刪除。

但是,如果管理得不合理,或者任務不是很短的,則執行緒池將包含大量的活動執行緒。這可能導致資源紊亂並因此導致效能下降。

ExecutorService executorService = Executors.newCachedThreadPool();

2.4 ScheduledExecutor

當我們有一個需要定期執行的任務或者我們希望延遲某個任務時,就會使用此型別的 executor。

ScheduledExecutorService scheduledExecService = Executors.newScheduledThreadPool(1);

可以使用 scheduleAtFixedRate 或 scheduleWithFixedDelay 在 ScheduledExecutor 中定期的執行任務。

scheduledExecService.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduledExecService.scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit)

這兩種方法的主要區別在於它們對連續執行定期任務之間的延遲的應答。

scheduleAtFixedRate:無論前一個任務何時結束,都以固定間隔執行任務。

scheduleWithFixedDelay:只有在當前任務完成後才會啟動延遲倒計時。

3. 對於 Future 物件的理解

可以使用 executor 返回的 java.util.concurrent.Future 物件訪問提交給 executor 的任務的結果。 Future 可以被認為是 executor 對呼叫者的響應。

Future<String> result = executorService.submit(callableTask);

如上所述,提交給 executor 的任務是非同步的,即程式不會等待當前任務執行完成,而是直接進入下一步。相反,每當任務執行完成時,executor 在此 Future物件中設定它。

呼叫者可以繼續執行主程式,當需要提交任務的結果時,他可以在這個 Future物件上呼叫.get() 方法來獲取。如果任務完成,結果將立即返回給呼叫者,否則呼叫者將被阻塞,直到 executor 完成此操作的執行並計算出結果。

如果呼叫者不能無限期地等待任務執行的結果,那麼這個等待時間也可以設定為定時地。可以通過 Future.get(long timeout,TimeUnit unit) 方法實現,如果在規定的時間範圍內沒有返回結果,則丟擲 TimeoutException。呼叫者可以處理此異常並繼續執行該程式。

如果在執行任務時出現異常,則對 get 方法的呼叫將丟擲一個ExecutionException

對於 Future.get()方法返回的結果,一個重要的事情是,只有提交的任務實現了java.util.concurrent.Callable介面時才返回 Future。如果任務實現了Runnable介面,那麼一旦任務完成,對 .get() 方法的呼叫將返回 null

另一個關注點是 Future.cancel(boolean mayInterruptIfRunning) 方法。此方法用於取消已提交任務的執行。如果任務已在執行,則 executor 將嘗試在mayInterruptIfRunning 標誌為 true 時中斷任務執行。

4. Example: 建立和執行一個簡單的 Executor

我們現在將建立一個任務並嘗試在 fixed pool executor 中執行它:

public class Task implements Callable<String> {

    private String message;

    public Task(String message) {
        this.message = message;
    }

    @Override
    public String call() throws Exception {
        return "Hello " + message + "!";
    }
}

Task 類實現 Callable 介面並有一個 String 型別作為返回值的方法。 這個方法也可以丟擲 Exception。這種向 executor 丟擲異常的能力以及 executor 將此異常返回給呼叫者的能力非常重要,因為它有助於呼叫者知道任務執行的狀態。

現在讓我們來執行一下這個任務:

public class ExecutorExample {  
    public static void main(String[] args) {

        Task task = new Task("World");

        ExecutorService executorService = Executors.newFixedThreadPool(4);
        Future<String> result = executorService.submit(task);

        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("Error occured while executing the submitted task");
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}

我們建立了一個具有4個執行緒數的 FixedThreadPool executors,因為這個 demo是在四核處理器上開發的。如果正在執行的任務執行大量 I/O 操作或花費較長時間等待外部資源,則執行緒數可能超過處理器的核心數。

我們例項化了 Task 類,並將它提交給 executors 執行。 結果由 Future 物件返回,然後我們在螢幕上列印。

讓我們執行 ExecutorExample 並檢視其輸出:

Hello World!

正如所料,任務追加了問候語 Hello 並通過 Future object 返回結果。

最後,我們呼叫 executorService 物件上的 shutdown 來終止所有執行緒並將資源返回給 OS。

.shutdown() 方法等待 executor 完成當前提交的任務。 但是,如果要求是立即關閉 executor 而不等待,那麼我們可以使用 .shutdownNow() 方法。

任何待執行的任務都將結果返回到 java.util.List 物件中。

我們也可以通過實現 Runnable 介面來建立同樣的任務:

public class Task implements Runnable{

    private String message;

    public Task(String message) {
        this.message = message;
    }

    public void run() {
        System.out.println("Hello " + message + "!");
    }
}

當我們實現 Runnable 時,這裡有一些重要的變化。

  1. 無法從 run() 方法得到任務執行的結果。 因此,我們直接在這裡列印。
  2. run() 方法不可丟擲任何已受檢的異常。

5. 總結

隨著處理器時鐘速度難以提高,多執行緒正變得越來越主流。 但是,由於涉及複雜性,處理每個執行緒的生命週期非常困難。

在本文中,我們展示了一個高效而簡單的多執行緒框架,即 Executor Framework,並解釋了它的不同元件。 我們還看了一下在 executor 中建立提交和執行任務的不同示例。

與往常一樣,此示例的程式碼可以在 GitHub上找到。

原文:https://stackabuse.com/concurrency-in-java-the-executor-framework/

作者:Chandan Singh

譯者:KeepGoingPawn