1. 程式人生 > >Java執行緒之執行緒池

Java執行緒之執行緒池

Executor框架可以將任務的提交與任務的執行策略解耦開來。就像許多對複雜過程的解耦操作那樣,這種論斷多少有些言過其實了。雖然Executor框架為定製和修改執行策略提供了相當大的靈活性,但並非所有的任務都能適用所有的執行策略。有些型別的任務需要明確的指定執行策略。

依賴性任務 使用執行緒封閉機制的任務 對相應時間敏感的任務 使用ThreadLocal的任務
大多數行為正確的任務都是獨立的:它們不依賴於其他任務的執行時序,執行結果或其他效果。當執行緒池中執行獨立任務時,可以隨意地改變執行緒池的大小和配置,這些修改只會對執行效能產生影響。然而如果提交給執行緒池的任務需要依賴其他任務,那麼就隱含地給執行策略帶來約束,此時必須小心的維持這些執行策略以避免產生活躍性問題 與執行緒池相比,單執行緒的Executor能夠對併發性做出更強的承諾。它們能夠確保任務不會併發的執行,使你能夠放寬程式碼對執行緒安全的要求。物件可以封閉在任務執行緒中,使得該執行緒中執行的任務在訪問該物件時不需要同步。即使這些資源不是執行緒安全的也沒有問題。這種情形將在任務與執行策略之間形成隱式的耦合——任務要求其執行所在的Executor是單執行緒的。如果將Executor從單執行緒環境改變為執行緒池環境,那麼將會失去執行緒安全性。 GUI應用程式對於相應時間是敏感的:如果使用者在點選按鈕後需要很長延遲才能得到可見的反饋,那麼他們會感到不滿。如果將一個執行時間較長的任務提交到單執行緒的Executor
中,或者將多個執行時間較長的任務提交到一個只包含少量執行緒的執行緒池中,那麼將降低由該Executor管理的服務的響應性
ThreadLocal使每個執行緒都可以擁有某個變數的一個私有“版本”。然而,只要條件允許,Executor可以自由地重用這些執行緒。在標準的Executor實現中,當執行需求較低時將回收空閒執行緒,而當需求增加時將新增新的執行緒,並且如果從任務中拋了一個未受檢查的異常,那麼將一個新的工作者執行緒來替代丟擲異常的執行緒。只有當執行緒本地值得生命週期受限於任務的生命週期時,線上程池的執行緒中使用ThreadLocal才有意義,而線上程池的執行緒中不應該使用ThreadLocal
在任務之間傳值

 只有當任務都是同類型的並且相互獨立時,執行緒池的效能才能達到最佳。如果將執行時間較長的任務與執行時間較短的任務混合在一起,那麼除非執行緒池很大,否則有可能造成“擁塞”。如果提交的任務依賴於其他任務,那麼除非執行緒池無限大,否則有可能造成死鎖。幸運的是,在基於網路的典型伺服器應用程式中——網頁伺服器、郵件伺服器以及檔案伺服器等,它們的請求通常都是同類型的並且相互獨立的。

在一些任務中,需要擁有或排除某種特定的執行策略。如果某些任務依賴於其他的任務,那麼將會要求執行緒池足夠大,從而確保它們依賴任務不會被放入等待佇列中或被拒絕,而採用執行緒封閉機制的任務需要序列執行。通過將這些需求寫入文件,將來的程式碼維護人員就不會由於使用可某種不合適的執行策略而破壞安全性或活躍性。

執行緒飢餓死鎖

 線上程池中,如果任依賴於其他任務,那麼可能產生死鎖。在單執行緒的Executor中,如果一個任務將另一個任務提交到同一個Executor,並且等待這個被提交任務的結果,那麼通常會引發死鎖。第二個任務停留在工作佇列中,並等待第一個任務的完成,而第一個人任務又無法完成,因為他在等待第二個任務的完成。在更大的執行緒池中,如果所有正在執行任務的執行緒都由與等待其他仍處於工作佇列中的任務而阻塞,那麼將會發生同樣的問題。這種現象被稱為執行緒飢餓死鎖,只要執行緒池中的任務需要無限地等待一些必須由池中其他任務才能提供的資源或條件,例如在某個任務等待另一個任務返回值或者執行結果,那麼除非執行緒池足夠大,否則將會發生執行緒飢餓死鎖。

每當提交了一個有依賴性的Executor任務時,要清楚地知道可能會出現執行緒“飢餓”死鎖,因此需要在程式碼或配置Executor的配置檔案中記錄執行緒池的大小限制或配置限制。

 除了線上程池大小上的顯示限制外,還可能由於其他資源上的約束而存在一些隱式限制。如果應用程式使用一個包含10個連線的JSBC連線池,並且每個任務需要一個數據庫連線,那麼執行緒池就好像只有10個執行緒,因為當超過10個任務時,新的任務需要等待其他任務釋放連線。

執行時間較長的任務

 如果任務阻塞的時間過長,那麼即使不出現死鎖,執行緒池的響應性也會變得糟糕。執行時間較長的任務不僅會造成執行緒池堵塞,甚至還會增加執行時間較短的任務的服務時間。如果執行緒池中的執行緒數量遠小於在穩定狀態下執行時間較長任務的數量,那麼到最後可能所有的執行緒都會執行這些執行時間較長的任務,從而影響整體的響應性。
 有一項技術可以緩解執行時間較長任務造成的影響,即限定任務等待資源的時間,而不要無限制的等待。在平臺類庫的大多數阻塞方法中,都同時定義了限時版本和無限時版本,例如Thread.joinBlockingQueue.putCountDownLatch.await以及Selector.selector等。如果等待超時,那麼可以將任務標識為失敗。然後終止任務或者將任務重新放回佇列以便隨後執行。這樣,無論任務的最終結果是否成功,這種辦法都能確保任務總能繼續執行下去,並將執行緒釋放出來,執行一些能更快完成的任務。如果線上程池中總是充滿了被阻塞的任務,那麼也有可能表明執行緒池的規模過小。

設定執行緒池的大小

 執行緒池的理想大小取決於被提交任務的型別以及所部署系統的特性。在程式碼中通常不會固定執行緒池的大小,而應該通過某種配置機制來提供,或根據Runtime.availableProcessors來動態計算大小。
 幸運的是,要設定執行緒池的大小也並不困難,只需要避免“過大”和“過小”這兩種極端情況。如果執行緒池過大,那麼大量的執行緒將在相對很少的CPU和記憶體資源上發生競爭,這不僅會導致更高的記憶體使用量,而且還有可能耗盡資源。如果執行緒過小,那麼將會導致許多空閒的處理器無法執行工作,從而降低吞吐量。
 要想正確的設定執行緒池的大小,必須分析計算環境、資源預算和任務的特性。在部署的系統中有多少個CPU?多大的記憶體?任務是計算密集型、I/O密集型還是二者皆可?它們是否需要像JDBC連線這樣的稀缺資源?如果需要執行不同類別的任務,並且它們之間的行為相差很大,那麼應該考慮使用多個執行緒池,從而使每個執行緒池可以根據自己的工作負載來調整。
 對於計算密集型的任務,在擁有N(cpu)個處理器的系統上,當執行緒池的大小為N(cpu)+1時,通常能實現最優的利用率。(即使當計算密集型的執行緒偶爾由於頁缺失故障或者其他原因而暫停時,這個“額外”的執行緒也能確保CPU的時鐘週期不會被浪費。)對於包含I/O操作或者其他阻塞操作的任務,由於執行緒並不會一直執行,因此執行緒的規模應該更大。要正確的設定執行緒池的大小,你必須估算出任務的等待時間與計算時間的比值。這種估算不需要很精確,並且可以通過一些分析或者監控工具來獲得。你還可以通過另一種方法來調節執行緒池的大小:在某個基準負載下,分別給不同大小的執行緒池來運作應用程式,並觀察CPU利用率、
 給定下列定義:

N(cpu)=number of CPUs
U(cpu)=targe CPU utilization,0≤U(cpu)≤1
W/C=ratio of wait time to compute time
 要使處理器達到期望的使用率,執行緒池的最優大小等於
N(threads)=N(cpu)*U(cpu)*(1+W/C)
 可以通過Runtime來獲得CPU的數目:
int N_CPUS = Runtime.getRuntimes().availableProcessors();

 當然CPU週期並不是唯一影響執行緒池大小的資源,還包括記憶體、檔案控制代碼,套接字控制代碼和資料庫連線池。計算這些資源對執行緒池的約束條件是更容易的:計算每個任務對該資源的需求量,然後用該資源的可用總量除以每個任務的需求量,所以結果就是執行緒池大小的上限。
 當任務需要通過某種資源池來管理資源時,例如資料庫連線,那麼執行緒池和資源池的大小將會影響。如果每個任務都需要一個數據庫連線,那麼連線池的大小就限制了執行緒池的大小。同樣,當執行緒池中的任務是資料庫連線的唯一使用者時,那麼執行緒池的大小又將限制連線池的大小。

配置ThreadPollExecutor

ThreadPoolExecutor為一些Executor提供了基本實現,這些Execuor是由Executors中的newCachedThreadPoolnewFixedThreadPoolnewScheduledThreadExecutor等工廠方法返回的,ThreadPoolExecutor是一個靈活的、穩定的執行緒池,允許進行各種定製。
 如果預設的執行策略不能滿足需求,那麼可以通過ThreadPoolExecutor的建構函式來例項化一個物件,並根據自己的需求來定製,並且可以參考Executors的原始碼來了解預設配置下的執行策略,然後再以這些執行策略為基礎進行修改。ThreadPoolExecutor定義了很多建構函式,下面給出了最常見的形式。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {...}

執行緒的建立與銷燬

 執行緒池的基本大小、最大大小以及存活時間等因素共同負責執行緒的建立與銷燬。基本大小也就是執行緒池的目標大小,即在沒有任務執行時執行緒池的大小,,並且只有在工作佇列滿了的情況下才會建立超出這個數量的執行緒。執行緒池的最大大小表示可同時活動的執行緒數量的上限。如果某個執行緒的空閒時間超過了存活時間,那麼將被標記為可回收的,並且當執行緒的當前大小超過了基本大小時,這個執行緒將被終止。
 通過調節執行緒池的基本大小和存活時間,可以幫助執行緒池回收空閒執行緒佔用的資源,從而使得這些資源可以用於執行其他工作。(顯然,這是折衷:回收空閒執行緒會產生額外的延遲,因為當需求增加時,必須建立新的執行緒來滿足需求。)
newFixedThreadPool工廠方法將執行緒池的基本大小設定為引數中指定的值,而且建立的執行緒池不會超時。newCachedThreadPool工廠方法將執行緒池的最大大小設定為Integer.MAX_VALUE,而將基本大小設定為零,並將超時設定為1分鐘,這種方法創造出來的執行緒池可以被無限擴充套件,並且當需求降低時會自動收縮。其他形式的執行緒可以顯示的ThreadPoolExecutor建構函式來構造。

管理佇列任務

 在有限的執行緒池中會限制可併發執行任務數量。(單執行緒的Executor是一種值得注意的特例:它們能夠確保不會有任務併發執行,因為它們通過執行緒封閉來實現執行緒安全性)
 如果無限制的建立執行緒,那麼將導致不穩定性,那麼則採用固定大小的執行緒池(而不是每收到一個請求就建立一個新執行緒)來解決這個問題。然而,這個方案並不完整,在高負載的情況下,應用程式仍可能耗盡資源,只是出現問題的概率較小。如果新請求的到達速率超過了執行緒池的處理速率,那麼新到來的請求將積累起來。線上程池中,這些請求會在一個由Executor管理的Runnable佇列中等待,而不會像執行緒那樣去競爭CPU資源。通過一個Runnable和一個連結串列節點來實現一個等待中的任務,當然比使用執行緒來表示的開銷低很多,但如果客戶提交給伺服器請求的速率超過了伺服器的處理速率,那麼仍然會耗盡資源。
 即使請求的平均達到速率很穩定,也仍然會出現請求突增的情況。儘管佇列有助於緩解任務的突增問題,但如果任務持續高速地到來,那麼最終還是會抑制請求的到達率以避免耗盡記憶體。甚至在耗盡記憶體之前,響應性效能也隨著任務佇列的增長而變得越來越糟糕。
ThreadPoolExecutor允許提供一個BlockingQueue來儲存等待執行的任務。基本的任務排隊方法有3種:無界佇列有界佇列同步移交。佇列的選擇與其他的配置引數有關,例如執行緒池的大小等。
newFixedThreadPoolnewSingleThreadExecutor在預設的情況下將使用一個無界的LinkedBlockingQueue。如果所有工作者執行緒都處於忙碌狀態,那麼任務將在佇列中等待。如果任務持續快速的到達,並且超過了執行緒池處理它們的速度,那麼佇列將無限制的增加。
 一種更穩妥的資源管理策略是使用有界佇列,例如ArrayBlockingQueue、有界的LinkedBlockingQueuePriorityBlockingQueue。有界佇列有助於避免資源耗盡的情況發生,但它又帶來了新的問題;當佇列填滿後,新的任務怎麼辦?(有許多飽和策略可以解決這個問題。)在使用有界的工作佇列時佇列的大小與執行緒池的大小必須一起調節。如果執行緒池較小而佇列較大,那麼有助於減少記憶體使用量,降低CPU的使用率,同時還可以減少上下文的切換,但付出的代價可能會限制吞吐量。
 對於非常大的或者無界的執行緒池,可以通過使用SynchronousQueue來避免任務排隊,以及直接將任務從生產者移交到工作者執行緒。SynchronousQueue不是一個真正的佇列,而是一種線上程之間進行移交的機制,要將一個元素放入SynchronousQueue中,必須有另一個執行緒正在等待接受這個元素。如果沒有執行緒正在等待,並且執行緒池的當前大小小於最大值,那麼ThreadPoolExecutor將建立一個新的執行緒,否則根據飽和策略,這個任務將被拒絕。使用直接移交將更高效,因為任務會直接移交給執行它的執行緒,而不是被首先放在佇列中,然後有工作者執行緒從佇列中提取該任務。只有當執行緒池是無界的或者可以拒絕任務時,SynchronousQueue才有實際價值。在newCacheThreadPool工廠方法就使用的是SynchronousQueue
 當使用像LinkedBlockingQueueArrayBlockQueue這樣的FIFO(先進先出)佇列時,任務的執行順序與它們的到達順序相同。如果想進一步控制任務執行順序,還可以使用PriorityBlockQueue,這個佇列將根據優先順序來安排任務。任務的優先順序是通過自然排序或者Comparator(如果執行緒實現了Comparable)來定義的。

對於ExecutornewCachedThreadPool工廠方法是一種很好的預設選擇,它能提供比固定大小的執行緒池更好的排隊效能。當需要限制當前任務的數量以滿足資源管理需要時,那麼可以選擇固定大小的執行緒池,就像在接受網路客戶請求的伺服器應用程式中,如果不進行限制,那麼很容易發生過載問題。

 只有當任務相互獨立時,為執行緒池或工作佇列設定界限才是合理的。如果任務之間存在依賴性,那麼有界的執行緒池或者佇列就可能導致執行緒“飢餓”死鎖的問題。此時應該使用無界的執行緒池,例如newCachedThreadpool

飽和策略

 當有界佇列被填滿後,飽和策略開始發揮作用。ThreadPoolExecutor的飽和策略可以通過呼叫setRejectedExecutionHandler來修改。(如果某個任務被提交到一個已被關閉的Executor時,也會用到飽和策略。)JDK提供了幾種不同的RejectedExecutionHandler實現,每種實現都包含不同的飽和策略:AbortPolicyCallerRunsPolicyDiscardPoilcyDiscardOldestPolicy
 “中止(Abort)”策略是預設的飽和策略,該策略將丟擲未檢查的RejectedExecutionException。呼叫者可以捕獲這個異常,然後根據需求編寫自己的處理程式碼,當提交的任務無法儲存到佇列中等待執行時,“拋棄(Discard)”策略會悄悄的拋棄該任務。“拋棄最舊的(DiscardOldest)”策略則會拋棄下一個將被執行的任務,然後嘗試重新提交新的任務。(如果工作佇列是一個優先佇列,那麼“拋棄最舊的策略”將導致拋棄優先順序最高的任務,因此最好不要將“拋棄最舊的”飽和策略和優先順序佇列放在一起使用。)
 "呼叫者執行(Caller-Runs)"策略實現了一種調節機制,該策略不會拋棄任務,也不會丟擲異常,而是將某些任務回退到呼叫者,從而降低新任務的流量,它不會線上程池的某個執行緒中執行新提交的任務,而是在一個呼叫了execute的執行緒中執行該任務。當執行緒池中的所有執行緒都被佔用,並且工作佇列被填滿後,下一個任務會在呼叫execute時在主執行緒執行。由於執行任務需要一定的事件,因此主執行緒至少在一段時間內不能提交任何任務,從而使得工作者執行緒有時間來處理正在執行的任務。在這期間,主執行緒不會呼叫accept
 當建立Executor時,可以選擇飽和策略或者對執行策略進行修改。下面給出瞭如何建立一個固定大小的執行緒池,同時使用“呼叫者執行”飽和策略。

ThreadPoolExecutor executor=new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunPolioy());

 當工作佇列被填滿後,沒有預定義的飽和策略來阻塞execute。然後通過使用Semaphore來限制任務的到達率,就可以實現這個功能。在下面的BoundedExecutor中給出了這種方法。該方法使用了一個無界佇列(因為不能限制佇列的大小和任務的到達率),並設定訊號量的上界設定為執行緒池的大小加上可排隊任務的數量,這是因為訊號量需要控制在執行的和等待的任務數量。

public class BoundedExecutor {
    private Executor exec;
    private Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submit(Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
        }

    }
}

執行緒工廠

 每當執行緒池需要建立一個執行緒時,都是通過執行緒工廠方法來完成的。預設的執行緒工廠方法將建立一個新的,非守護的執行緒,並且不包含特殊的配置的資訊。通過指定一個執行緒工廠方法,可以制定執行緒池的配置資訊。在ThreadFactory中只定義了一個方法newThread,每當執行緒池需要建立一個執行緒池時都會呼叫這個方法。
 然而,在許多情況下需要使用定製的執行緒工廠方法。例如,你希望為執行緒池中的執行緒指定一個UncaughExceptionHandler,或者例項化一個定製的Thread類用於執行除錯資訊的記錄。你還可以希望修改執行緒的優先順序(不推薦)或者守護狀態(不推薦)。或許你只是希望給執行緒取一個更有意義的名稱,用來解釋執行緒的轉儲資訊和錯誤日誌。

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}. Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     * create a thread is rejected
     */
    Thread newThread(Runnable r);
}

 如果在應用程式中需要利用安全策略來控制對某些特殊程式碼庫的訪問許可權,那麼可以通過Executor中的privilegedThreadFactory工廠來定製自己的執行緒工廠。通過這種方式創建出來的執行緒,將與建立privilegedThreadFactory的執行緒擁有相同的訪問許可權、AccessControlContextcontextClassLoader。如果不使用privilegedThreadFactory,執行緒池建立的執行緒將從在需要新執行緒時呼叫executesubmit的客戶程式中繼承訪問許可權,從而導致令人困惑的安全性異常。

在呼叫建構函式後不在定製ThreadPoolExecutor

 在呼叫完ThreadPoolExecutor的建構函式後,仍然可以通過設定函式(Setter)來修改大多數傳遞給它的建構函式的引數(例如執行緒池的基本大小、最大大小、存活時間、執行緒工廠以及拒絕執行處理器)。如果Executor是通過Executors中的某個(newSingleThreadExecutor除外)工廠方法建立的,那麼將可以將結果的型別轉換為ThreadPoolExecutor以訪問設定器。
 在Executors中包含一個unconfigurableExecutorService工廠方法,該方法對一個現有的ExecutorService進行包裝,使其只暴露出ExecutorService的方法,因此不能對它進行配置。newSingleThreadExecutor返回按這種方式封裝的ExecutorService,而不是最初的ThreadPoolExecutor。雖然單執行緒的Executor實際上被實現為一個只包含唯一執行緒的執行緒池,但它也同樣確保了不會併發的執行任務。如果在程式碼中增加單執行緒Executor的執行緒池大小,那麼將破壞它的執行語義。
 可以在自己的Executor中使用這項技術達到以防止執行策略被修改。如果將ExecutorService暴露給不信任的程式碼。又不希望對其進行修改,就可以通過unconfigurableExecutorService來包裝它、