1. 程式人生 > >幾種執行緒池的實現演算法分析

幾種執行緒池的實現演算法分析

1. 前言

在閱讀研究執行緒池的原始碼之前,一直感覺執行緒池是一個框架中最高深的技術。研究後才發現,執行緒池的實現是如此精巧。本文從技術角度分析了執行緒池的本質原理和組成,同時分析了JDK、Jetty6、Jetty8、Tomcat的原始碼實現,對於想了解執行緒池本質、更好的使用執行緒池或者定製實現自己的執行緒池的業務場景具有一定指導意義。

2. 使用執行緒池的意義

  • 複用:類似WEB伺服器等系統,長期來看內部需要使用大量的執行緒處理請求,而單次請求響應時間通常比較短,此時Java基於作業系統的本地呼叫方式大量的建立和銷燬執行緒本身會成為系統的一個性能瓶頸和資源浪費。若使用執行緒池技術可以實現工作執行緒的複用,即一個工作執行緒建立和銷燬的生命週期期間內可以執行處理多個任務,從而總體上降低執行緒建立和銷燬的頻率和時間,提升了系統性能。
  • 流控:伺服器資源有限,超過伺服器效能的過高併發設定反而成為系統的負擔,造成CPU大量耗費於上下文切換、記憶體溢位等後果。通過執行緒池技術可以控制系統最大併發數和最大處理任務量,從而很好的實現流控,保證系統不至於崩潰。
  • 功能:JDK的執行緒池實現的非常靈活,並提供了很多功能,一些場景基於功能的角度會選擇使用執行緒池。

3. 執行緒池技術要點:

從內部實現上看,執行緒池技術可主要劃分為如下6個要點實現:


工作者執行緒worker:即執行緒池中可以重複利用起來執行任務的執行緒,一個worker的生命週期內會不停的處理多個業務job。執行緒池“複用”的本質就是複用一個worker去處理多個job,“流控“的本質就是通過對worker數量的控制實現併發數的控制。通過設定不同的引數來控制worker的數量可以實現執行緒池的容量伸縮從而實現複雜的業務需求圖1執行緒池技術要點

  • 待處理工作job的儲存佇列:工作者執行緒workers的數量是有限的,同一時間最多隻能處理最多workers數量個job。對於來不及處理的job需要儲存到等待佇列裡,空閒的工作者work會不停的讀取空閒佇列裡的job進行處理。基於不同的佇列實現,可以擴展出多種功能的執行緒池,如定製隊列出隊順序實現帶處理優先順序的執行緒池、定製佇列為阻塞有界佇列實現可阻塞能力的執行緒池等。流控一方面通過控制worker數控制併發數和處理能力,一方面可基於佇列控制執行緒池處理能力的上限。
  • 執行緒池初始化:即執行緒池引數的設定和多個工作者workers的初始化。通常有一開始就初始化指定數量的workers或者有請求時逐步初始化工作者兩種方式。前者執行緒池啟動初期響應會比較快但造成了空載時的少量效能浪費,後者是基於請求量靈活擴容但犧牲了執行緒池啟動初期效能達不到最優。
  • 處理業務job演算法:業務給執行緒池新增任務job時執行緒池的處理演算法。有的執行緒池基於演算法識別直接處理job還是增加工作者數處理job或者放入待處理佇列,也有的執行緒池會直接將job放入待處理佇列,等待工作者worker去取出執行。
  • workers的增減演算法:業務執行緒數不是持久不變的,有高低峰期。執行緒池要有自己的演算法根據業務請求頻率高低調節自身工作者workers的數量來調節執行緒池大小,從而實現業務高峰期增加工作者數量提高響應速度,而業務低峰期減少工作者數來節省伺服器資源。增加演算法通常基於幾個維度進行:待處理工作job數、執行緒池定義的最大最小工作者數、工作者閒置時間。

執行緒池終止邏輯:應用停止時執行緒池要有自身的停止邏輯,保證所有job都得到執行或者拋棄。

4. 幾種執行緒池的實現細節

結合上面的技術點,列舉幾種執行緒池實現方式。

  • 工作者workers與待處理工作佇列實現方式舉例:

    實現

    工作者workers結構與併發保護

    待處理工作佇列結構

    JDK

    使用了HashSet來儲存工作者workers,通過可重入鎖ReentrantLock對其進行併發保護。每個worker都是一個Runnable介面。

    使用了實現介面BlockingQueue的阻塞佇列來儲存待處理工作job,並把佇列作為建構函式引數,從而實現業務可以靈活的擴充套件定製執行緒池的佇列。業務也可使用JDK自身的同步阻塞佇列SynchronousQueue、有界佇列ArrayBlockingQueue、無界佇列LinkedBlockingQueue、優先順序佇列PriorityBlockingQueue。

    Jetty6

    同樣使用了HashSet儲存工作者workers,通過synchronized一個物件進行HashSet的併發保護。每個工作者實際上是一個Thread的擴充套件。

    使用了陣列儲存待處理的job物件Runnable。陣列初始化容量為_maxThreads個,使用變數_queued計算儲存當前內部待處理job的個數即陣列length。超過陣列最大值時,擴大_maxThreads個容量,因此陣列永遠夠用夠大,容量無界。同樣是用synchronized一個物件的方式實現同步。

    Jetty8

    使用了ConcurrentLinkedQueue儲存工作者workers,利用JDK基於CAS演算法的實現提高了併發效率,同時也降低了執行緒池併發保護的複雜程度。針對佇列ConcurrentLinkedQueue無法保證size()實時性問題引入原子變數AtomicInteger統計工作者數量。

    與JDK相同實現,使用了基於介面BlockingQueue的阻塞佇列來儲存待處理工作job,也支援線上程池建構函式的引數中傳入佇列型別。同時,Jetty8內部預設未設定佇列型別場景可自動設定使用2種佇列:有界無法擴容的ArrayBlockingQueue及Jetty自身定製擴充套件實現的可擴容佇列BlockingArrayQueue。

    Tomcat

    基於JDK的ThreadPoolExecutors實現,複用JDK業務

    複用JDK業務

  • 執行緒池初始化與處理業務job演算法舉例:

    實現

    執行緒池構造與工作者初始化

    處理業務job的演算法

    JDK

    1. 基於多個構造引數實現靈活初始化,幾個核心引數如下:

    corePoolSize:核心工作者數

    maximumPoolSize:最大工作者數

    keepAliveTime:超過核心工作者數時閒置工作者的存活時間。

    workQueue:待處理job佇列,即前面提到的BlockingQueue介面。

    2. 預設初始化後不啟動工作者,等待有請求時才啟動。可以通過呼叫執行緒池介面提前啟動核心工作數個工作者執行緒,也可以啟動業務期望的多個工作者執行緒。

    1. 工作者workers數量低於核心工作者數corePoolSize時會優先建立一個工作者worker處理job,處理成功則返回。

    2. 工作者workers數量高於核心工作者數時會優先把job放入到待處理佇列,放入佇列成功時處理結束。

    3. 步驟2中入隊失敗會識別工作者數是否還小於最大工作者數maximumPoolsize,小於的話也會新建立一個工作者worker處理job。

    4. 拒絕處理

    Jetty6

    1. 同樣支援設定多個引數:

    _spawnOrShrinkAt:擴容/縮容閥值

    _minThreads:最小工作者數

    _maxThreads:最大工作者數

    _maxIdleTimeMs:閒置工作者最大閒置超時時間

    2. 初始化後直接啟動_minThreads個工作者執行緒

    1. 查詢閒置的工作者worker,找到則派發job。

    2. 沒有閒置的工作者,將job存入待處理陣列。

    3. 當識別到陣列中待處理job超過擴容閥值引數時,擴容增加工作者處理job

    4. 否則不處理

    Jetty8

    1. 配置引數類似Jetty6,去除了_spawnOrShrinkAt閥值引數。

    2. 初始化後直接啟動_minThreads個工作者執行緒

    非常簡單,直接將待處理job入隊。

    Tomcat

    1. 基於JDK執行緒池的構造方法

    2. 來請求時啟動工作者

    處理方法複用JDK的,但是在開始提交前擴充套件了JDK的功能,實現了可以統計提交數submittedCount的能力

  • 執行緒池工作者worker的增減機制舉例:

    實現

    工作者增加演算法

    工作者減少演算法

    JDK

    1. 待處理job來時,工作者workers數量低於核心工作者數corePoolSize時。

    2. 待處理job來時,workers數超過核心數小於最大工作者數且入待處理佇列失敗場景。

    3. 業務呼叫執行緒池的更新核心工作者數介面時,若發現擴容,會增加工作者數。

    1. 待處理任務佇列裡沒有job並且工作者workers數量超過了核心工作者數corePoolSize。

    2. 待處理任務佇列裡沒有job並且允許工作者數量小於核心工作者引數為true,此場景會至少保留一個工作者執行緒。

    Jetty6

    1. 啟動執行緒池時會啟動_minThreads個工作者執行緒

    2. 待處理的job數量高於了閥值引數且工作者數沒有達到最大值時會增加工作者。

    3. 呼叫執行緒池介面setMinThreads更新最小工作者數時會根據需要增加工作者。

    如下三個條件同時滿足時會減少工作者:

    1. 待處理任務陣列中沒有待處理job

    2. 工作者workers數量超過了最小工作者數_minThreads

    3. 閒置工作者執行緒數高於了閥值引數

    Jetty8

    1. 啟動執行緒池時啟動最小工作者引數個工作者執行緒

    2. 已經沒有閒置工作者或者閒置工作者的數量已經小於待處理的job的總數

    3. 呼叫執行緒池介面setMinThreads更新最小工作者數時

    如下三個條件同時滿足時會減少工作者:

    1. 待處理任務佇列裡沒有待處理的job

    2. 工作者workers總數超過了最小工作者引數配置_minThreads

    3. 工作者執行緒的閒置時間超時

    Tomcat

    同JDK增加工作者演算法

    複用JDK減少演算法,同時定製擴充套件延遲引數,超過引數時,直接丟擲異常到外面來終止執行緒池工作者。

5. 小結

對比幾種執行緒池實現,JDK的實現是最為靈活、功能最強且擴充套件性最好的,Tomcat即基於JDK執行緒池功能擴充套件實現,複用原有業務的同時擴充了自己的業務。Jetty6是完全自己定製的執行緒池業務,耦合執行緒池眾多複雜的業務邏輯到執行緒池類裡面,邏輯相對最為複雜,擴充套件性也非常差。Jetty8相對Jetty6的實現簡化了很多,其中利用了JDK中的同步容器和原子變數,同時實現方式也越來越接近JDK。

6. 參考原始碼

  • JDK原始碼類:java.util.concurrent.ThreadPoolExecutor
  • Jetty6原始碼類:org.mortbay.thread.QueuedThreadPool
  • Jetty8原始碼類:org.eclipse.jetty.util.thread.QueuedThreadPool
  • Tomcat原始碼類:org.apache.tomcat.util.threads.ThreadPoolExecutor
原文:http://www.infoq.com/cn/articles/thread-pool-algorithm-realization?utm_source=infoq&utm_medium=popular_links_homepage