1. 程式人生 > >Java多執行緒總結(6)— 執行緒池的基本使用和執行流程分析

Java多執行緒總結(6)— 執行緒池的基本使用和執行流程分析

1 執行緒池的實現原理及基本類結構

  合理利用執行緒池能夠帶來三個好處。

  1. 降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
  2. 提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
  3. 提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。

  Executor執行緒池框架的最大優點是把任務的提交和執行解耦。客戶端將要執行的任務封裝成Task,然後提交即可。而Task如何執行客戶端則是透明的。具體點講,提交一個Callable物件給ExecutorService(如最常用的執行緒池ThreadPoolExecutor),將得到一個Future物件,呼叫Future物件的get方法等待執行結果。
  下圖是執行緒池所涉及到的所有類的結構圖(右鍵檢視大圖),先從整體把握下:
這裡寫圖片描述


              圖1 執行緒池實現原理類結構圖

  上面這個圖是很複雜的,涉及到了執行緒池內部實現原理的所有類,不利於我們理解執行緒池如何使用。我們先從客戶端的角度出發,看看客戶端使用執行緒池所涉及到的類結構圖:
這裡寫圖片描述
              圖2 執行緒池使用的基本類結構圖

  從圖一可知,實際的執行緒池類是實現ExecutorService介面的類,有ThreadPoolExecutor、ForkJoinPool和ScheduledThreadPoolExecutor。下面以常用的ThreadPoolExecutor為例講解。

2 執行緒池實現步驟

2.1 執行緒池的建立

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

  引數說明:

  • corePoolSize(執行緒池的基本執行緒數): the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set.當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads方法,執行緒池會提前建立並啟動所有基本執行緒。
  • maximumPoolSize(執行緒池最大執行緒數): the maximum number of threads to allow in the pool.執行緒池允許建立的最大執行緒數。如果任務佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是如果使用了無界的任務佇列這個引數就沒什麼效果。
  • keepAliveTime(執行緒活動保持時間):when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。
  • TimeUnit(執行緒活動保持時間的單位):the time unit for the keepAliveTime argument. 可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  • workQueue(任務佇列):the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.用於儲存等待執行的任務的阻塞佇列。 可以選擇以下幾個阻塞佇列。
      ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。
      LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
      SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
      PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。
  • ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。
  • RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。以下是JDK1.5提供的四種策略。
      AbortPolicy:直接丟擲異常。
      CallerRunsPolicy:只用呼叫者所線上程來執行任務。
      DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
      DiscardPolicy:不處理,丟棄掉。
      當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。

  由此可見,建立一個執行緒所需的引數很多,執行緒池為我們提供了類Executors的靜態工廠方法以建立不同型別的執行緒池。
- newFixedThreadPool可以生成固定大小的執行緒池;
- newCachedThreadPool可以生成一個無界、可以自動回收的執行緒池;
- newSingleThreadScheduledExecutor可以生成一個單個執行緒的執行緒池;
- newScheduledThreadPool還可以生成支援週期任務的執行緒池。

2.2 向執行緒池提交任務

  有兩種方式提交任務:
1.使用void execute(Runnable command)方法提交任務
execute方法返回型別為void,所以沒有辦法判斷任務是否被執行緒池執行成功。

Runnable task = new Runnable() {
    @Override
    public void run() {
        System.out.println("Task is running by " + Thread.currentThread().getName());
        System.out.println("執行緒池正在執行的執行緒數:" + threadPoolExecutor.getActiveCount());
    }
};
threadPool.execute(task);

2.使用submit方法提交任務
Future<?> submit(Runnable task);<T> Future<T> submit(Runnable task, T result);Future<T> submit(Callable<T> task);會返回一個Future,可以通過這個future來判斷任務是否執行成功,通過future的get方法來獲取返回值,get方法會阻塞直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後立即返回,這時有可能任務沒有執行完。

Future<?> future = threadPool.submit(task);
try {
    Object result = future.get();
    System.out.println("任務是否完成:" + future.isDone());
    System.out.println("返回的結果為:" + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
} finally {
    // 關閉執行緒池
    threadPool.shutdown();
}

2.3 執行緒池關閉

  1. shutdown()方法:這個方法會平滑地關閉ExecutorService,當我們呼叫這個方法時,ExecutorService停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢後將會關閉ExecutorService。
  2. awaitTermination(long timeout, TimeUnit unit)方法:這個方法有兩個引數,一個是timeout即超時時間,另一個是unit即時間單位。這個方法會使當前關閉執行緒池的執行緒等待timeout時長,當超過timeout時間後,則去監測ExecutorService是否已經關閉,若關閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用。
  3. shutdownNow()方法:這個方法會強制關閉ExecutorService,它將取消所有執行中的任務和在工作佇列中等待的任務,這個方法返回一個List列表,列表中返回的是等待在工作佇列中的任務。
// 4. 關閉執行緒池
threadPool.shutdown();
// hreadPool.shutdownNow();
System.out.println("執行緒池是否關閉:" + threadPool.isShutdown());
try {
    //當前執行緒阻塞10ms後,去檢測執行緒池是否終止,終止則返回true
    while(!threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
        System.out.println("檢測執行緒池是否終止:" + threadPool.isTerminated());
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println("執行緒池是否終止:" + threadPool.isTerminated());

  完整案例:

package com.markliu.concurrent.threadpool;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo1 {

    public static void main(String[] args) {
        /*
         * 1. 建立執行緒池
         * 
         * 建立一個固定執行緒數目的執行緒池。corePoolSize = maximumPoolSize = 5
         * 即執行緒池的基本執行緒數和最大執行緒數相等。
         * 相當於:
         * new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
         */
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        /*
         *  2. 封裝任務並提交給執行緒池
         */
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) threadPool;
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println("Task is running by " + Thread.currentThread().getName());
                System.out.println("執行緒池正在執行的執行緒數:" + threadPoolExecutor.getActiveCount());
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        /*
         * Starts all core threads, causing them to idly wait for work. 
         * This overrides the default policy of starting core threads 
         * only when new tasks are executed.
         */
        int count = threadPoolExecutor.prestartAllCoreThreads();
        System.out.println("開啟的所有core執行緒數:" + count);
        System.out.println("執行緒池當前執行緒數:" + threadPoolExecutor.getPoolSize());
        System.out.println("執行緒池的core number of threads:" + threadPoolExecutor.getCorePoolSize());
        System.out.println("執行緒池中的最大執行緒數:" + threadPoolExecutor.getLargestPoolSize());
        // 3. 執行,獲取返回結果
        /**
         * execute方式提交任務
         */
        // threadPool.execute(task);
        /**
         * submit方式提交任務
         */
        Future<?> future = threadPool.submit(task);
        try {
            // 阻塞,等待執行緒執行完成,並獲得結果
            Object result = future.get();
            System.out.println("任務是否完成:" + future.isDone());
            System.out.println("返回的結果為:" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            System.out.println("執行緒池中已經執行完的任務數:" + threadPoolExecutor.getCompletedTaskCount());
            // 4. 關閉執行緒池
            /*
             * shutdown方法平滑地關閉執行緒池,將執行緒池的狀態設為:SHUTDOWN
             * 停止接受任何新的任務且等待已經提交的任務執行完成,當所有已經
             * 提交的任務執行完畢後將會關閉執行緒池
             */
            threadPool.shutdown();
            /*
             *  shutdownNow方法強制關閉執行緒池,將執行緒池狀態設定為:STOP
             *  取消所有執行中的任務和在工作佇列中等待的任務,並返回所有未執行的任務List
             */
            // hreadPool.shutdownNow();
            System.out.println("執行緒池是否關閉:" + threadPool.isShutdown());
            try {
                //當前執行緒阻塞10ms後,去檢測執行緒池是否終止,終止則返回true
                while(!threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                    System.out.println("檢測執行緒池是否終止:" + threadPool.isTerminated());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("執行緒池是否終止:" + threadPool.isTerminated());
        }
    }
}

3 執行緒池的執行流程分析

  執行緒池的主要工作流程如下圖:
    這裡寫圖片描述
  當提交一個新任務到執行緒池時,執行緒池的處理流程如下:
- 首先執行緒池判斷“基本執行緒池”(corePoolSize)是否已滿?沒滿,建立一個工作執行緒來執行任務。滿了,則進入下個流程。
- 其次執行緒池判斷工作佇列(workQueue)是否已滿?沒滿,則將新提交的任務儲存在工作佇列裡。滿了,則進入下個流程。
- 最後執行緒池判斷整個執行緒池的執行緒數是否已超過maximumPoolSize?沒滿,則建立一個新的工作執行緒來執行任務,滿了,則交給拒絕策略來處理這個任務。
(我的理解:提交任務—>如果執行緒數未達到corePoolSize,則建立執行緒執行任務—>如果達到corePoolSize,仍讓提交了任務,則會有任務等待,所以將任務儲存在任務佇列中,直到任務佇列workQueue已滿—>如果workQueue已滿,仍然有任務提交,但未達到最大執行緒數,則繼續建立執行緒執行任務,直到執行緒數達到maximumPoolSize,如果達到了maximumPoolSize,則根據飽和策略拒絕該任務。這也就解釋了為什麼有了corePoolSize還有maximumPoolSize的原因。)
  關於執行緒池的工作原理後期從原始碼分析。

4 執行緒池的監控

  通過執行緒池提供的引數進行監控:
- taskCount:執行緒池需要執行的任務數量。
- completedTaskCount:執行緒池在執行過程中已完成的任務數量。小於或等於taskCount。
- largestPoolSize:執行緒池曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否滿過。如等於執行緒池的最大大小,則表示執行緒池曾經滿了。
- getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,池裡的執行緒不會自動銷燬,所以這個大小隻增不減
- getActiveCount:獲取活動的執行緒數。

  通過繼承執行緒池並重寫執行緒池的beforeExecute,afterExecute和terminated方法,可以在任務執行前,執行後和執行緒池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法線上程池裡是空方法。

    /**
     * 給定的Thread執行Runnable之前呼叫此方法
     *
     * @param t the thread that will run task {@code r}
     * @param r the task that will be executed
     */
    protected void beforeExecute(Thread t, Runnable r) { }
    /**
     * 給定的Runnable完成後執行此方法
     * This method is invoked by the thread that executed the task. 
     * @param r the runnable that has completed
     * @param t the exception that caused termination, or null if
     * execution completed normally
     */
    protected void afterExecute(Runnable r, Throwable t) { }
    /**
     * 當Executor終止時呼叫此方法.  
     * 注意:方法中子類應呼叫super.terminated()
     */
    protected void terminated() { }

  例如:

class ExtendedExecutor extends ThreadPoolExecutor {
      // ...
      protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
          try {
            Object result = ((Future<?>) r).get();
          } catch (CancellationException ce) {
              t = ce;
          } catch (ExecutionException ee) {
              t = ee.getCause();
          } catch (InterruptedException ie) {
              Thread.currentThread().interrupt(); // ignore/reset
          }
        }
        if (t != null)
          System.out.println(t);
      }
}