1. 程式人生 > >Java高併發程式設計(十一):Java中執行緒池

Java高併發程式設計(十一):Java中執行緒池

在開發過程中,合理地使用執行緒池能夠帶來3個好處。

  • 降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
  • 提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
  • 提高執行緒的可管理性。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,
    還會降低系統的穩定性,使用執行緒池可以進行統一分配、調優和監控。

1. 執行緒池的使用原理

從圖中可以看出,當提交一個新任務到執行緒池時,執行緒池的處理流程如下。

  1. 執行緒池判斷核心執行緒池裡的執行緒是否都在執行任務。如果不是,則建立一個新的工作
    執行緒來執行任務。如果核心執行緒池裡的執行緒都在執行任務,則進入下個流程。
  2. 執行緒池判斷工作佇列是否已經滿。如果工作佇列沒有滿,則將新提交的任務儲存在這
    個工作佇列裡。如果工作佇列滿了,則進入下個流程。
  3. 執行緒池判斷執行緒池的執行緒是否都處於工作狀態。如果沒有,則建立一個新的工作執行緒
    來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。

在這裡插入圖片描述

ThreadPool的執行示意圖:
在這裡插入圖片描述

ThreadPoolExecutor執行execute方法分下面4種情況。

  1. 如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意,執行這一步驟需要獲取全域性鎖)。
  2. 如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。
  3. 如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(注意,執行這一步驟需要獲取全域性鎖)。
  4. 如果建立新執行緒將使當前執行的執行緒超出maximumPoolSize,任務將被拒絕,並呼叫
    RejectedExecutionHandler.rejectedExecution()方法。

下面我們從原始碼的角度分析一下ThreadPoolExecutor的execute()方法:

 public void execute(Runnable command) {
        if (command == null)
            throw
new NullPointerException(); int c = ctl.get(); // 如果執行緒數小於基本執行緒數,則建立執行緒並執行當前任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如執行緒數大於等於基本執行緒數或執行緒建立失敗,則將當前任務放到工作佇列中。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //丟擲RejectedExecutionException異常 reject(command); //如果執行緒池不處於執行中或任務無法放入佇列中,並且當前執行緒的數量小於最大允許的執行緒數量,則建立一個工作執行緒執行任務。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //丟擲RejectedExecutionException異常 reject(command); }

工作執行緒:執行緒池建立執行緒時,會將執行緒封裝成工作執行緒Worker,Worker在執行完任務
後,還會迴圈獲取工作佇列裡的任務來執行。我們可以從Worker類的run()方法裡看到這點。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

ThreadPoolExecutor中執行緒執行任務的示意圖 如下:
在這裡插入圖片描述

2. 執行緒池的使用

2.1 執行緒池的建立

我們可以通過ThreadPoolExecutor來建立一個執行緒池。

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,milliseconds,runnableTaskQueue, handler);

建立一個執行緒池時需要輸入幾個引數,如下。

  • corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。
  • runnableTaskQueue(任務佇列):用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。
    • ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列
    • LinkedBlockingQueue:一個基於連結串列結構的有界阻塞佇列
    • SynchronousQueue:一個不儲存元素的阻塞佇列。
    • PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。
  • maximumPoolSize(執行緒池最大數量):執行緒池允許建立的最大執行緒數。
  • ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder
  • RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。
    • AbortPolicy: 直接丟擲異常
    • CallerRunsPolicy:只用呼叫者所線上程來執行任務
    • DiscardOldestPolicy:丟棄最近的一個任務,並執行當前執行緒。
    • DiscardPolicy:不處理丟掉
  • keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高執行緒的利用率。
  • TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒

2.2 向執行緒池提交任務

可以使用兩個方法向執行緒池提交任務,分別為execute()和submit()方法。

  1. execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功。
  2. submit()方法用於提交需要返回值的任務。執行緒池會返回一個future型別的物件,通過這個future物件可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前執行緒一段時間後立即返回,這時候有可能任務沒有執行完。

2.3 關閉執行緒池

關閉執行緒池有兩種方式,一種是呼叫shutdown()方法還有一種是呼叫shutdownNow()方法來關閉執行緒池。它們的原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。

但是他們之間也存在一些區別:

  • shutdownNow首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表
  • shutdown只是將執行緒池的狀態設定成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的執行緒。

當呼叫shutdown()和shutdownNow()方法之後,執行緒池執行isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫isTerminaed方法會返回true。

2.4 合理地配置執行緒池

要合理配置執行緒池,需要先分析任務的特性:

  1. 任務的性質:CPU密集型任務、IO密集型任務 和 混合型任務。
  2. 優先順序:高、中、低。
  3. 執行時間:長、中、短。
  4. 任務的依賴性:是否依賴其他的系統資源,如資料庫連線等。

配置方案

  • 根據任務性質,可做如下配置:

    • CPU密集型任務

      儘量使用小規模執行緒池,一般執行緒數量:CPU核心數 + 1。

      因為CPU密集型任務的CPU利用率很高,過多執行緒導致上下文切換過多,造成額外開銷。

    • IO密集型任務

      使用稍大的執行緒池,儘量配置多的執行緒,一般執行緒配置數量:CPU核心數 * 2。

      這種任務CPU利用率不是很高,因此可以讓CPU在等待IO的時候去處理別的任務。

    • 混合型任務

      任務分解成:CPU密集 和 IO密集 型任務,然後分別用不同規模執行緒池處理。

      注意:只要分解後的兩個任務執行時間相差不大,會比原來序列執行吞吐量高。

      但是,如果相差時間不大,先執行完的任務 要等待 後執行完的任務,最終執行時間依然取決於 後執行完的任務,還要加上任務拆分、合併的開銷,得不償失。

  • 優先順序,可以使用優先順序佇列PriorityBlockingQueue來處理,它會讓優先順序高的任務先執行。

  • 執行時間不同的任務,可以交給不同規模的執行緒池來處理,或使用優先順序佇列,讓短任務先執行。

  • 資料庫依賴性任務,執行緒提交SQL等待資料庫返回結果,等待時間越長,則CPU空閒時間越長。因此,可以將執行緒數量設定較大,能更好利用CPU。

使用執行緒池建議:建議使用有界佇列,可以增加系統穩定性 和 預警能力。

3.5 執行緒池的監控

如果系統大量使用執行緒池,則有必要對執行緒池進行監控。

可以通過執行緒池提供的引數進行監控

  • taskCount:需要執行的任務數量
  • completedTaskCount:已完成的任務數量
  • largestPoolSize:執行緒池中曾經建立過的最大執行緒數量
  • getPoolSize:執行緒池的執行緒數量,只增不減
  • getActiveCount:獲取活動的執行緒數量

也可繼承執行緒池來自定義執行緒池進行監控,重寫beforeExecute()afterExecute()terminated()方法進行監控。