1. 程式人生 > 其它 >阻塞佇列、執行緒池、非同步

阻塞佇列、執行緒池、非同步

參考
https://www.cnblogs.com/aspirant/p/8657801.html
https://www.cnblogs.com/linguanh/p/8000063.html
《java多執行緒程式設計實戰指南》

BlockingQueue

阻塞佇列,顧名思義,首先它是一個佇列,而一個佇列在資料結構中所起的作用大致如下圖所示:

從上圖我們可以很清楚看到,通過一個共享的佇列,可以使得資料由佇列的一端輸入,從另外一端輸出;
常用的佇列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同型別的佇列,DelayQueue就是其中的一種)

  • 先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。
  • 後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。  
    多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。然而,在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒),下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:

上圖所示:當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞(掛起),直到有資料放入佇列。

如上圖所示:當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞(掛起),直到佇列中有空的位置,執行緒被自動喚醒。
這也是我們在多執行緒環境下,為什麼需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了。

BlockingQueue的核心方法

放入資料

(1)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒);
(2)offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。
(3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.

獲取資料

(1)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null;
(2)poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。
(3)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入;
(4)drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。

常見BlockingQueue

ArrayBlockingQueue

基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。
ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於LinkedBlockingQueue,可能導致鎖的高爭用,進而導致較多的上下文切換;
ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。ArrayBlockingQueue不會增加GC負擔,這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。

LinkedBlockingQueue

基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。

SynchronousQueue

一種無緩衝的等待佇列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那麼對不起,大家都在集市等待。相對於有緩衝的BlockingQueue來說,少了一箇中間經銷商的環節(緩衝區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說採用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應效能可能會降低。
宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:

  • 如果採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO佇列來阻塞多餘的生產者和消費者,從而體系整體的公平策略;
  • 但如果是非公平模式(SynchronousQueue預設):SynchronousQueue採用非公平鎖,同時配合一個LIFO佇列來管理多餘的生產者和消費者,而後一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的資料永遠都得不到處理。

對比

ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞佇列。

  • 是否有界
    ArrayBlockingQueue是有界佇列, LinkedBlockingQueue既可以有界也可以無界
    排程
  • 排程
    LinkedBlockingQueue僅支援非公平排程
    ArrayBlockingQueue和SynchronousQueue支援公平和非公平排程
  • 適用場景
    LinkedBlockingQueue適合生產者和消費者執行緒併發程度較大的場景;
    ArrayBlockingQueue適合生產者和消費者執行緒併發程度較低的場景;
    SynchronousQueue適合生產者和消費者處理能力相差不大的場景。

其他阻塞佇列

DelayQueue

DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。
使用場景:DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連線佇列。

PriorityBlockingQueue

基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定),但需要注意的是PriorityBlockingQueue並不會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖。

執行緒池

執行緒工廠

執行緒工廠可以統一執行緒生成的樣式,增加執行緒異常處理物件、定製執行緒名稱等。

為什麼用執行緒池

執行緒相比普通的物件而言,會佔用額外的儲存空間—棧空間,此外執行緒啟動會產生相應的執行緒排程開銷,執行緒的銷燬也有開銷,系統能夠建立的執行緒數受限於系統的處理器數目。
通過執行緒池來使用執行緒更加有效,避免不必要的反覆建立執行緒的開銷。

執行緒池的基本引數和原理

執行緒池預先建立一定數目的工作者執行緒,客戶端不需要向執行緒池借用執行緒而是將其需要執行的任務作為一個物件提交給執行緒池,執行緒池可能將這些任務快取在佇列之中,而執行緒池內部的各個工作者執行緒則不斷取出任務並執行之。因此,執行緒池可以看做基於生產者—消費者模式的一種服務。
ThreadPoolExecutor類是一個常用的執行緒池,客戶端可以呼叫ThreadPoolExecutor.submit方法提交任務

Public Future<?> submit(Runnable task); 
Public Future< T > submit(Callable<T> task);

Task如果是一個Runnable例項,沒有返回結果,Task如果是Callable例項,可以由返回結果。
執行緒池內部維護的工作者執行緒數量稱為執行緒池的大小。執行緒池大小有3種形態,當前執行緒池大小表示執行緒池種實際工作者執行緒的數量;最大執行緒池大小表示執行緒池中允許存在的工作者執行緒的數量上限;核心執行緒大小表示一個不大於最大執行緒池大小的工作者執行緒數量上限。

當前執行緒池大小<=核心執行緒大小<=最大執行緒池大小
或者
核心執行緒大小<=當前執行緒池大小<=最大執行緒池大小

構造引數

public ThreadPoolExecutor(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit unit,
                      BlockingQueue<Runnable> workQueue,
                      ThreadFactory threadFactory,
                      RejectedExecutionHandler handler) {
  //...
}

workQueue: 稱為工作佇列的阻塞佇列
corePoolSize:執行緒池核心大小
maximumPoolSize:最大執行緒池大小
keepAliveTime和unit指定執行緒池中空閒執行緒的最大存活時間
threadFactory 指定建立工作者執行緒的執行緒工廠

初始狀態下,客戶端每提交一個任務執行緒池就建立一個工作者執行緒來處理任務。隨著任務的提交,當前執行緒池大小相應增加,在當前執行緒池大小達到核心執行緒池大小是,新來的任務被存入到工作佇列之中。這些快取的任務由執行緒池種所有的工作者執行緒負責取出進行執行。執行緒池將任務放入工作佇列的時候呼叫的是BlockingQueue的非阻塞方法offer(E e),所以當工作佇列滿的時候不會使提交任務的客戶端執行緒暫停。當工作佇列滿的時候,執行緒池會繼續建立新的工作者執行緒,直到當前執行緒池大小達到最大執行緒池大小。
執行緒池是通過threadFactory的newThread方法來建立工作者執行緒的。如果在建立執行緒池的時候沒有指定執行緒工廠(呼叫了ThreadPoolExecutor的其他構造器),那麼ThreadPoolExecutor會使用Executord.defaultThreadFactory()所返回的預設執行緒工廠。
當執行緒池飽和的時候,即工作佇列滿且當前執行緒池大小達到最大執行緒池大小的情況下,客戶端試圖提交的任務就會被拒絕。RejectExecutionHandler介面用於封裝被拒絕的任務的處理策略,ThreadPoolExecutor提供幾個現成的RejectExecutionHandler的實現類,其中ThreadPoolExecutor.AbortPolicy是ThreadPoolExecutor使用的預設RejectExecutionHandler。如果預設的AbortPolicy無法滿足可以優先考慮ThreadPoolExecutor提供的其他RejectExecutionHandler,其次考慮自行實現RejectExecutionHandler。
以下為拒絕策略

  • AbortPolicy:直接丟擲異常,預設策略;
  • CallerRunsPolicy:用呼叫者所在的執行緒來執行任務;
  • DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,並執行當前任務;
  • DiscardPolicy:直接丟棄任務;
    在當前執行緒池大小超過核心執行緒池大小的時候,超過核心執行緒池大小部分的工作者執行緒空閒(即工作者佇列中沒有待處理的任務)時間達到了keepAliveTime所指定的時間後就會被清理掉,即這些工作者執行緒會自動終止並被從執行緒池中被移除,需要謹慎設定,否則造成執行緒反覆建立。
    ThreadPoolExecutor.shutdown()/ shutdownNow()方法可以用來關閉執行緒池。前者會等待已提交執行緒繼續執行,禁止提交新任務,後者直接停止會將正在執行的任務停止,已提交未執行的任務會也不會執行。

執行緒池任務執行結果

如果向執行緒池提交任務不需要返回結果,提交的任務為Runnable,如果需要處理結果,則提交的任務需要為Callable。
Public Future<?> submit(Runnable task);
Public Future< T > submit(Callable task);
Callable介面是runnable介面的增強,call方法的返回值代表相應任務的處理結果,call方法在執行中可以丟擲異常,而runnable介面無返回值也無法丟擲異常。
Future介面例項可以看做提交給執行緒池執行任務的處理結果控制代碼,Future.get()方法可以用來獲取task引數指定的任務的處理結果。Future.get()被呼叫時,如果任務未執行完,那麼Future.get()會使當前執行緒暫停,直到相應的任務執行結束,Future.get()是一個阻塞方法,能丟擲InterruptedException說明可以響應執行緒中斷。如果任務丟擲一個任意的originalExeption,那麼Future.get()會丟擲ExcutionException,通過呼叫ExcutionException的getCause()方法可以返回originalExeption,從而捕獲原始的異常。Future可以執行cancel(boolean mayInterruptRunning)來取消任務,mayInterruptRunning表示是否允許通過傳送中斷來取消任務。返回值表示相應任務是否取消成功。另外可以通過isCancelled判斷任務是否取消成功,通過isDone判斷任務是否執行完成。
另外執行緒池提供了一系列監控方法。
執行緒池死鎖,不應該將有依賴關係的任務提交給同一個執行緒池,避免死鎖。
注意應該儘早提交任務,儘量晚的執行Future.get(),減少上下文切換。

非同步

Executor

Executor介面是對任務執行的抽象,定義瞭如下方法

Void execute (Runnable command);

Executor介面使任務的提交和任務執行的具體細節解耦
不過Executor功能有限,只能執行任務,無法返回結果;Executor需要工作者執行緒,但是沒有釋放工作者執行緒資源的方法。ExecutorService介面繼承Executor介面,解決了以上的問題。ExecutorService定義了幾個submit方法可以接受Callable介面或者Runnable介面表示的任務並返回相應的Future例項。ExecutorService還定義了shutdown和shutdownNow方法來釋放相關資源。ThreadPoolExecutor是ExecutorService的預設實現類。

Executors與常用執行緒池

Executors是一個實用的工具類,可以返回預設執行緒工廠,將Runnable例項轉成Callable例項,還提供了一些能夠返回ExecutorService例項的快捷方法,這樣不用手動建立ThreadPoolExecutor。

  • newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(
            nThreads,   // corePoolSize
            nThreads,   // maximumPoolSize == corePoolSize
            0L,         // 空閒時間限制是 0
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>() // 無界阻塞佇列
        );
}

newFixedThreadPool 執行緒永不過期,適合固定執行緒的場景。

  • newCachedThreadPool
public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(
        0,                  // corePoolSoze == 0
        Integer.MAX_VALUE,  // maximumPoolSize 非常大
        60L,                // 空閒判定是60 秒
        TimeUnit.SECONDS,
        // 神奇的無儲存空間阻塞佇列,每個 put 必須要等待一個 take
        new SynchronousQueue<Runnable>()  
    );
}

newCachedThreadPool適合頻繁短時任務
由於核心執行緒池大小為0,因此提交給執行緒池執行的第一個任務會導致該執行緒池的第一個工作者執行緒建立啟動,後續任務提交的時候,由於當前執行緒池大小已經查過核心執行緒池大小(0),因此ThreadPoolExecutor會將任務快取到工作佇列中,即呼叫workerQueue.offer方法。
SynchronousQueue內部並不維護用於儲存佇列元素的實際儲存空間,一個執行緒(生產者)在執行SynchronousQueue. Offer(E)的時候,如果沒有其他執行緒(消費者)因執行SynchronousQueue. take()而被暫停,那麼SynchronousQueue. Offer(E)呼叫會直接丟擲false,即如佇列失敗,說明所有的工作者執行緒都在執行,即無空閒工作者執行緒的情況下給提提交任務會導致該任務無法被快取成功。當ThreadPoolExecutor在快取失敗且執行緒池當前大小未達到最大執行緒池大小的情況下會啟動新的工作者執行緒,極端情況下每提交一個任務均會建立一個新的工作者執行緒,會導致執行緒數太多,過多上下文切換導致系統被拖慢。
即在有一個任務被快取的情況下,會一直增加工作者執行緒。

  • newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
        return 
            new FinalizableDelegatedExecutorService
                (
                    new ThreadPoolExecutor
                        (
                            1,
                            1,
                            0L,
                            TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>(),
                            threadFactory
                        )
                );
    }

newSingleThreadExecutor適合多生產者-單消費者
可以看到除了多了個 FinalizableDelegatedExecutorService 代理,其初始化和 newFiexdThreadPool 的 nThreads = 1 的時候是一樣的。
區別就在於:
• newSingleThreadExecutor返回的ExcutorService在解構函式finalize()處會呼叫shutdown()
• 如果我們沒有對它呼叫shutdown(),那麼可以確保它在被回收時呼叫shutdown()來終止執行緒。
使用ThreadFactory,可以改變執行緒的名稱、執行緒組、優先順序、守護程序狀態,一般採用預設。

FutereTask

FutereTask融合了Runnable和Callable的優點。FutereTask是Runnable介面的一個實現類,FutereTask表示的非同步任務可以提交給Executor例項或者工作者執行緒,此外FutereTask可以直接返回其代表的非同步任務的處理結果。FutereTask的一個構造方法可以將Callable封裝成FutereTask,相當於將Callable轉成了Runnable,同時還可以通過FutereTask檢視處理結果。
ThreadPoolExecutor.submit(Callable task)繼承AbstractExecutorService.submit方法,實際將Callable物件封裝成了FutereTask。

Executorservice.submit和excutorservice.execute的區別

ExecutorService介面繼承Executor介面,Executor是最上層的,其中只包含一個execute()方法, execute()方法的入參為一個Runnable,返回值為void

public interface Executor {
    void execute(Runnable command);
}

submit()是ExecutorService介面中的方法,在ExecutorService介面中,一共有以上三個sumbit()方法,入參可以為Callable,也可以為Runnable,而且方法有返回值Future

public interface ExecutorService extends Executor {
  ...
  <T> Future<T> submit(Callable<T> task);

  <T> Future<T> submit(Runnable task, T result);

  Future<?> submit(Runnable task);
  ...
}

execute()和submit()方法的區別總結:
(1)接收的引數不一樣;
(2)submit()有返回值,而execute()沒有;
(3)Exception處理方式不同
例如,如果task裡會丟擲checked或者unchecked exception,而你又希望外面的呼叫者能夠感知這些exception並做出及時的處理,那麼就需要用到submit,通過對Future.get()進行丟擲異常的捕獲,然後對其進行處理。
如果呼叫execute()提交任務中丟擲來未捕獲的異常,則對其進行執行任務的工作者執行緒就會異常中止,雖然執行緒池會建立新的工作者執行緒,但是這個有開銷。
UncaughtExceptionHandler可以用於捕獲執行緒異常,UncaughtExceptionHandler只有在execute()方法裡異常處理類才能生效,通過submit()提交的任務,UncaughtExceptionHandler無法生效。

多執行緒的幾種實現方式

(1)繼承Thread,重寫run方法;
(2)通過Runnable物件構建Thread物件;
(3)執行緒池,ThreadPoolExecutor或者Executors快捷生成執行緒池;