1. 程式人生 > 實用技巧 >執行緒池的實現原理分析

執行緒池的實現原理分析

什麼是執行緒池

  在 Java 中,如果每個請求到達就建立一個新執行緒,建立和銷燬執行緒花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的使用者請求的時間和資源要多的多。如果在一個 Jvm 裡建立太多的執行緒,可能會使系統由於過度消耗記憶體或“切換過度”而導致系 統資源不足。   為了解決這個問題,就有了執行緒池的概念,執行緒池的核心邏輯是提前建立好若干個執行緒放在一個容器中。如果有任務需要處理,則將任務直接分配給執行緒池中的執行緒來執行就行,任務處理完以後這個執行緒不會被銷燬,而是等待後續分配任務。同時通過執行緒池來重複管理執行緒還可以避免建立大量執行緒增加開銷。

執行緒池的優勢

  合理的使用執行緒池,可以帶來一些好處 1. 降低建立執行緒和銷燬執行緒的效能開銷 2. 提高響應速度,當有新任務需要執行是不需要等待執行緒建立就可以立馬執行 3. 合理的設定執行緒池大小可以避免因為執行緒數超過硬體資源瓶頸帶來的問題

Java 中提供的執行緒池 API

  相信有很多同學或多或少都接觸過執行緒池,也可能自己也研究過執行緒池的原理。但是要想合理的使用執行緒池,那麼勢必要對執行緒池的原理有比較深的理解。

執行緒池的使用

  要了解一個技術,我們仍然是從使用開始。JDK 為我們提供了幾種不同的執行緒池實現。我們先來通過一個簡單的案例來引入執行緒池的基本使用。   在 Java 中怎麼建立執行緒池呢?下面這段程式碼演示了建立三個固定執行緒數的執行緒池。
public class Test implements Runnable {
@Override
public void run() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}

static ExecutorService service = Executors.newFixedThreadPool(3);

public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
service.execute(new Test());
}
service.shutdown();
}
}

Java 中提供的執行緒池 Api

  為了方便大家對於執行緒池的使用,在 Executors 裡面提供了幾個執行緒池的工廠方法,這樣,很多新手就不需要了解太多關於 ThreadPoolExecutor 的知識了,他們只需要直接使用Executors 的工廠方法, 就可以使用執行緒池: newFixedThreadPool:該方法返回一個固定數量的執行緒池,執行緒數不變,當有一個任務提交時,若執行緒池中空閒,則立即執行,若沒有,則會被暫緩在一個任務佇列中,等待有空閒的執行緒去執行。 newSingleThreadExecutor: 建立一個執行緒的執行緒池,若空閒則執行,若沒有空閒執行緒則暫緩在任務佇列中。 newCachedThreadPool:返回一個可根據實際情況調整執行緒個數的執行緒池,不限制最大執行緒數量,若用空閒的執行緒則執行任務,若無任務則不建立執行緒。並且每一個空閒執行緒會在 60 秒後自動回收 newScheduledThreadPool: 建立一個可以指定執行緒的數量的執行緒池,但是這個執行緒池還帶有延遲和週期性執行任務的功能,類似定時器。

ThreadpoolExecutor

  上面提到的四種執行緒池的構建,都是基於 ThreadpoolExecutor 來構建的,小夥伴們打起精神來了,接下來將一起了解一下面試官最喜歡問到的一道面試題“請簡單說下你知道的執行緒池和ThreadPoolThread 有哪些構造引數”
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
  ThreadpoolExecutor 有多個過載的構造方法,我們可以基於它最完整的構造方法來分析先來解釋一下每個引數的作用,稍後我們在分析原始碼的過程中再來詳細瞭解引數的意義。
public ThreadPoolExecutor(int corePoolSize, //核心執行緒數量
 int maximumPoolSize, //最大執行緒數
 long keepAliveTime, //超時時間,超出核心執行緒數量以外的執行緒空餘存活時間
 TimeUnit unit, //存活時間單位
 BlockingQueue<Runnable> workQueue, //儲存執行任務的佇列
ThreadFactory threadFactory,//建立新執行緒使用的工廠
RejectedExecutionHandler handler //當任務無法執行的時候的處理方式)
  執行緒池裡的執行緒的初始化與其他執行緒一樣,但是在完成任務以後,該執行緒不會自行銷燬,而是以掛起的狀態返回到執行緒池。直到應用程式再次向執行緒池發出請求時,執行緒池裡掛起的執行緒就會再度啟用執行任務。這樣既節省了建立執行緒所造成的效能損耗,也可以讓多個任務反覆重用同一執行緒,從而在應用程式生存期內節約大量開銷。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>());
}
  FixedThreadPool 的核心執行緒數和最大執行緒數都是指定值,也就是說當執行緒池中的執行緒數超過核心執行緒數後,任務都會被放到阻塞佇列中。另外 keepAliveTime 為 0,也就是超出核心執行緒數量以外的執行緒空餘存活時間而這裡選用的阻塞佇列是 LinkedBlockingQueue,使用的是預設容量 Integer.MAX_VALUE,相當於沒有上限 這個執行緒池執行任務的流程如下: 1. 執行緒數少於核心執行緒數,也就是設定的執行緒數時,新建執行緒執行任務2. 執行緒數等於核心執行緒數後,將任務加入阻塞佇列 3. 由於佇列容量非常大,可以一直新增 4. 執行完任務的執行緒反覆去佇列中取任務執行 用途:FixedThreadPool 用於負載比較大的伺服器,為了資源的合理利用,需要限制當前執行緒數量

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 60L, TimeUnit.SECONDS,
 new SynchronousQueue<Runnable>());
}
  CachedThreadPool 建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒; 並且沒有核心執行緒,非核心執行緒數無上限,但是每個空閒的時間只有 60 秒,超過後就會被回收。 它的執行流程如下: 1. 沒有核心執行緒,直接向 SynchronousQueue 中提交任務 2. 如果有空閒執行緒,就去取出任務執行;如果沒有空閒執行緒,就新建一個 3. 執行完任務的執行緒有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就被回收

newSingleThreadExecutor

  建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行

執行緒池的實現原理分析

  執行緒池的基本使用我們都清楚了,接下來我們來了解一下執行緒池的實現原理。   ThreadPoolExecutor 是執行緒池的核心,提供了執行緒池的實現。ScheduledThreadPoolExecutor 繼承了 ThreadPoolExecutor,並另外提供一些排程方法以支援定時和週期任務。Executers 是工具類,主要用來建立執行緒池物件 我們把一個任務提交給執行緒池去處理的時候,執行緒池的處理過程是什麼樣的呢?首先直接來看看定義

執行緒池原理分析(FixedThreadPool)

原始碼分析

execute   基於原始碼入口進行分析,先看 execute 方法:
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//1.當前池中執行緒比核心數少,新建一個執行緒執行任務
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//2.核心池已滿,但任務佇列未滿,新增到佇列中
            int recheck = ctl.get();
            //任務成功新增到佇列以後,再次檢查是否需要新增新的執行緒,因為已存在的執行緒可能被銷燬了
            if (!isRunning(recheck) && remove(command))
                reject(command);//如果執行緒池處於非執行狀態,並且把當前的任務從任務佇列中移除成功,則拒絕該任務
            else if (workerCountOf(recheck) == 0)//如果之前的執行緒已被銷燬完,新建一個執行緒
                addWorker(null, false);
        } else if (!addWorker(command, false)) //3.核心池已滿,佇列已滿,試著建立一個新執行緒
            reject(command); //如果建立新執行緒失敗了,說明執行緒池被關閉或者執行緒池完全滿了,拒絕任務
    }
addWorker   如果工作執行緒數小於核心執行緒數的話,會呼叫 addWorker,顧名思義,其實就是要建立一個工作執行緒。我們來看看原始碼的實現原始碼比較長,看起來比較唬人,其實就做了兩件事。 1)呼叫迴圈 CAS 操作來將執行緒數加 1; 2)新建一個執行緒並啟用。

拒絕策略

1、AbortPolicy:直接丟擲異常,預設策略; 2、CallerRunsPolicy:用呼叫者所在的執行緒來執行任務; 3、DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,並執行當前任務; 4、DiscardPolicy:直接丟棄任務; 當然也可以根據應用場景實現 RejectedExecutionHandler 介面,自定義飽和策略,如記錄日誌或持久化儲存不能處理的任務