1. 程式人生 > >解讀 java 併發佇列 BlockingQueue

解讀 java 併發佇列 BlockingQueue

 

點選新增圖片描述(最多60個字)編輯

 

 

今天呢!燈塔君跟大家講:

解讀 java 併發佇列 BlockingQueue

最近得空,想寫篇文章好好說說 java 執行緒池問題,我相信很多人都一知半解的,包括我自己在仔仔細細看原始碼之前,也有許多的不解,甚至有些地方我一直都沒有理解到位。

說到執行緒池實現,那麼就不得不涉及到各種 BlockingQueue 的實現,那麼我想就 BlockingQueue 的問題和大家分享分享我瞭解的一些知識。

本文沒有像之前分析 AQS 那樣一行一行原始碼分析了,不過還是把其中最重要和最難理解的程式碼說了一遍,所以不免篇幅略長。本文涉及到比較多的 Doug Lea 對 BlockingQueue 的設計思想,希望有心的讀者真的可以有一些收穫,我覺得自己還是寫了一些乾貨的。

本文直接參考 Doug Lea 寫的 Java doc 和註釋,這也是我們在學習 java 併發包時最好的材料了。希望大家能有所思、有所悟,學習 Doug Lea 的程式碼風格,並將其優雅、嚴謹的作風應用到我們寫的每一行程式碼中。

目錄:

BlockingQueue

開篇先介紹下 BlockingQueue 這個介面的規則,後面再看其實現。

首先,最基本的來說, BlockingQueue 是一個先進先出的佇列(Queue),為什麼說是阻塞(Blocking)的呢?是因為 BlockingQueue 支援當獲取佇列元素但是佇列為空時,會阻塞等待佇列中有元素再返回;也支援新增元素時,如果佇列已滿,那麼等到佇列可以放入新元素時再放入。

BlockingQueue 是一個介面,繼承自 Queue,所以其實現類也可以作為 Queue 的實現來使用,而 Queue 又繼承自 Collection 介面。

BlockingQueue 對插入操作、移除操作、獲取元素操作提供了四種不同的方法用於不同的場景中使用:1、丟擲異常;2、返回特殊值(null 或 true/false,取決於具體的操作);3、阻塞等待此操作,直到這個操作成功;4、阻塞等待此操作,直到成功或者超時指定時間。總結如下:

 

點選新增圖片描述(最多60個字)編輯

 

BlockingQueue 的各個實現都遵循了這些規則,當然我們也不用死記這個表格,知道有這麼回事,然後寫程式碼的時候根據自己的需要去看方法的註釋來選取合適的方法即可。

對於 BlockingQueue,我們的關注點應該在 put(e) 和 take() 這兩個方法,因為這兩個方法是帶阻塞的。

BlockingQueue 不接受 null 值的插入,相應的方法在碰到 null 的插入時會丟擲 NullPointerException 異常。null 值在這裡通常用於作為特殊值返回(表格中的第三列),代表 poll 失敗。所以,如果允許插入 null 值的話,那獲取的時候,就不能很好地用 null 來判斷到底是代表失敗,還是獲取的值就是 null 值。

一個 BlockingQueue 可能是有界的,如果在插入的時候,發現佇列滿了,那麼 put 操作將會阻塞。通常,在這裡我們說的無界佇列也不是說真正的無界,而是它的容量是 Integer.MAX_VALUE(21億多)。

BlockingQueue 是設計用來實現生產者-消費者佇列的,當然,你也可以將它當做普通的 Collection 來用,前面說了,它實現了 java.util.Collection 介面。例如,我們可以用 remove(x) 來刪除任意一個元素,但是,這類操作通常並不高效,所以儘量只在少數的場合使用,比如一條訊息已經入隊,但是需要做取消操作的時候。

BlockingQueue 的實現都是執行緒安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll  不一定是原子操作。如 addAll(c) 有可能在添加了一些元素後中途丟擲異常,此時 BlockingQueue 中已經添加了部分元素,這個是允許的,取決於具體的實現。

BlockingQueue 不支援 close 或 shutdown 等關閉操作,因為開發者可能希望不會有新的元素新增進去,此特性取決於具體的實現,不做強制約束。

最後,BlockingQueue 在生產者-消費者的場景中,是支援多消費者和多生產者的,說的其實就是執行緒安全問題。

相信上面說的每一句都很清楚了,BlockingQueue 是一個比較簡單的執行緒安全容器,下面我會分析其具體的在 JDK 中的實現,這裡又到了 Doug Lea 表演時間了。

BlockingQueue 實現之 ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 介面的有界佇列實現類,底層採用陣列來實現。

其併發控制採用可重入鎖來控制,不管是插入操作還是讀取操作,都需要獲取到鎖才能進行操作。

如果讀者看過我之前寫的《一行一行原始碼分析清楚 AbstractQueuedSynchronizer(二)》 的關於 Condition 的文章的話,那麼你一定能很容易看懂 ArrayBlockingQueue  的原始碼,它採用一個 ReentrantLock 和相應的兩個 Condition 來實現。

ArrayBlockingQueue 共有以下幾個屬性:

點選新增圖片描述(最多60個字)編輯

 

我們用個示意圖來描述其同步機制:

點選新增圖片描述(最多60個字)編輯

 

ArrayBlockingQueue 實現併發同步的原理就是,讀操作和寫操作都需要獲取到 AQS 獨佔鎖才能進行操作。如果佇列為空,這個時候讀操作的執行緒進入到讀執行緒佇列排隊,等待寫執行緒寫入新的元素,然後喚醒讀執行緒佇列的第一個等待執行緒。如果佇列已滿,這個時候寫操作的執行緒進入到寫執行緒佇列排隊,等待讀執行緒將佇列元素移除騰出空間,然後喚醒寫執行緒佇列的第一個等待執行緒。

對於 ArrayBlockingQueue,我們可以在構造的時候指定以下三個引數:

  1. 佇列容量,其限制了佇列中最多允許的元素個數;
  2. 指定獨佔鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高,公平鎖可以保證每次都是等待最久的執行緒獲取到鎖;
  3. 可以指定用一個集合來初始化,將此集合中的元素在構造方法期間就先新增到佇列中。

更具體的原始碼我就不進行分析了,因為它就是 AbstractQueuedSynchronizer 中 Condition 的使用,感興趣的讀者請看我寫的《一行一行原始碼分析清楚 AbstractQueuedSynchronizer(二)》,因為只要看懂了那篇文章,ArrayBlockingQueue 的程式碼就沒有分析的必要了,當然,如果你完全不懂 Condition,那麼基本上也就可以說看不懂 ArrayBlockingQueue 的原始碼了。

BlockingQueue 實現之 LinkedBlockingQueue

底層基於單向連結串列實現的阻塞佇列,可以當做無界佇列也可以當做有界佇列來使用。看構造方法:

 

點選新增圖片描述(最多60個字)編輯

 

 

點選新增圖片描述(最多60個字)編輯

我們看看這個類有哪些屬性:

點選新增圖片描述(最多60個字)編輯

這裡用了兩個鎖,兩個 Condition,簡單介紹如下:

takeLock 和 notEmpty 怎麼搭配:如果要獲取(take)一個元素,需要獲取 takeLock 鎖,但是獲取了鎖還不夠,如果佇列此時為空,還需要佇列不為空(notEmpty)這個條件(Condition)。

putLock 需要和 notFull 搭配:如果要插入(put)一個元素,需要獲取 putLock 鎖,但是獲取了鎖還不夠,如果佇列此時已滿,還需要佇列不是滿的(notFull)這個條件(Condition)。

首先,這裡用一個示意圖來看看 LinkedBlockingQueue 的併發讀寫控制,然後再開始分析原始碼:

點選新增圖片描述(最多60個字)編輯

 

看懂這個示意圖,原始碼也就簡單了,讀操作是排好隊的,寫操作也是排好隊的,唯一的併發問題在於一個寫操作和一個讀操作同時進行,只要控制好這個就可以了。

先上構造方法:

點選新增圖片描述(最多60個字)編輯

注意,這裡會初始化一個空的頭結點,那麼第一個元素入隊的時候,佇列中就會有兩個元素。讀取元素時,也總是獲取頭節點後面的一個節點。count 的計數值不包括這個頭節點。

我們來看下 put 方法是怎麼將元素插入到隊尾的:

點選新增圖片描述(最多60個字)編輯

我們再看看 take 方法:

 

點選新增圖片描述(最多60個字)編輯

原始碼分析就到這裡結束了吧,畢竟還是比較簡單的原始碼,基本上只要讀者認真點都看得懂。

BlockingQueue 實現之 SynchronousQueue

它是一個特殊的佇列,它的名字其實就蘊含了它的特徵 - - 同步的佇列。為什麼說是同步的呢?這裡說的並不是多執行緒的併發問題,而是因為當一個執行緒往佇列中寫入一個元素時,寫入操作不會立即返回,需要等待另一個執行緒來將這個元素拿走;同理,當一個讀執行緒做讀操作的時候,同樣需要一個相匹配的寫執行緒的寫操作。這裡的 Synchronous 指的就是讀執行緒和寫執行緒需要同步,一個讀執行緒匹配一個寫執行緒。

我們比較少使用到 SynchronousQueue 這個類,不過它線上程池的實現類 ThreadPoolExecutor 中得到了應用,感興趣的讀者可以在看完這個後去看看相應的使用。

雖然上面我說了佇列,但是 SynchronousQueue 的佇列其實是虛的,其不提供任何空間(一個都沒有)來儲存元素。資料必須從某個寫執行緒交給某個讀執行緒,而不是寫到某個佇列中等待被消費。

你不能在 SynchronousQueue 中使用 peek 方法(在這裡這個方法直接返回 null),peek 方法的語義是隻讀取不移除,顯然,這個方法的語義是不符合 SynchronousQueue 的特徵的。SynchronousQueue 也不能被迭代,因為根本就沒有元素可以拿來迭代的。雖然 SynchronousQueue 間接地實現了 Collection 介面,但是如果你將其當做 Collection 來用的話,那麼集合是空的。當然,這個類也是不允許傳遞 null 值的(併發包中的容器類好像都不支援插入 null 值,因為 null 值往往用作其他用途,比如用於方法的返回值代表操作失敗)。

接下來,我們來看看具體的原始碼實現吧,它的原始碼不是很簡單的那種,我們需要先搞清楚它的設計思想。

原始碼加註釋大概有 1200 行,我們先看大框架:

點選新增圖片描述(最多60個字)編輯

 

Transferer 有兩個內部實現類,是因為構造 SynchronousQueue 的時候,我們可以指定公平策略。公平模式意味著,所有的讀寫執行緒都遵守先來後到,FIFO 嘛,對應 TransferQueue。而非公平模式則對應 TransferStack。

點選新增圖片描述(最多60個字)編輯

我們先採用公平模式分析原始碼,然後再說說公平模式和非公平模式的區別。

接下來,我們看看 put 方法和 take 方法:

 

點選新增圖片描述(最多60個字)編輯

我們看到,寫操作 put(E o) 和讀操作 take() 都是呼叫 Transferer.transfer(…) 方法,區別在於第一個引數是否為 null 值。

我們來看看 transfer 的設計思路,其基本演算法如下:

  1. 當呼叫這個方法時,如果佇列是空的,或者佇列中的節點和當前的執行緒操作型別一致(如當前操作是 put 操作,而佇列中的元素也都是寫執行緒)。這種情況下,將當前執行緒加入到等待佇列即可。
  2. 如果佇列中有等待節點,而且與當前操作可以匹配(如佇列中都是讀操作執行緒,當前執行緒是寫操作執行緒,反之亦然)。這種情況下,匹配等待佇列的隊頭,出隊,返回相應資料。

其實這裡有個隱含的條件被滿足了,佇列如果不為空,肯定都是同種型別的節點,要麼都是讀操作,要麼都是寫操作。這個就要看到底是讀執行緒積壓了,還是寫執行緒積壓了。

我們可以假設出一個男女配對的場景:一個男的過來,如果一個人都沒有,那麼他需要等待;如果發現有一堆男的在等待,那麼他需要排到佇列後面;如果發現是一堆女的在排隊,那麼他直接牽走隊頭的那個女的。

既然這裡說到了等待佇列,我們先看看其實現,也就是 QNode:

點選新增圖片描述(最多60個字)編輯

相信說了這麼多以後,我們再來看 transfer 方法的程式碼就輕鬆多了。

點選新增圖片描述(最多60個字)編輯

 

點選新增圖片描述(最多60個字)編輯

 

Doug Lea 的巧妙之處在於,將各個程式碼湊在了一起,使得程式碼非常簡潔,當然也同時增加了我們的閱讀負擔,看程式碼的時候,還是得仔細想想各種可能的情況。

下面,再說說前面說的公平模式和非公平模式的區別。

相信大家心裡面已經有了公平模式的工作流程的概念了,我就簡單說說 TransferStack 的演算法,就不分析原始碼了。

  1. 當呼叫這個方法時,如果佇列是空的,或者佇列中的節點和當前的執行緒操作型別一致(如當前操作是 put 操作,而棧中的元素也都是寫執行緒)。這種情況下,將當前執行緒加入到等待棧中,等待配對。然後返回相應的元素,或者如果被取消了的話,返回 null。
  2. 如果棧中有等待節點,而且與當前操作可以匹配(如棧裡面都是讀操作執行緒,當前執行緒是寫操作執行緒,反之亦然)。將當前節點壓入棧頂,和棧中的節點進行匹配,然後將這兩個節點出棧。配對和出棧的動作其實也不是必須的,因為下面的一條會執行同樣的事情。
  3. 如果棧頂是進行匹配而入棧的節點,幫助其進行匹配並出棧,然後再繼續操作。

應該說,TransferStack 的原始碼要比 TransferQueue 的複雜一些,如果讀者感興趣,請自行進行原始碼閱讀。

BlockingQueue 實現之 PriorityBlockingQueue

帶排序的 BlockingQueue 實現,其併發控制採用的是 ReentrantLock,佇列為無界佇列(ArrayBlockingQueue 是有界佇列,LinkedBlockingQueue 也可以通過在建構函式中傳入 capacity 指定佇列最大的容量,但是 PriorityBlockingQueue 只能指定初始的佇列大小,後面插入元素的時候,如果空間不夠的話會自動擴容)。

簡單地說,它就是 PriorityQueue 的執行緒安全版本。不可以插入 null 值,同時,插入佇列的物件必須是可比較大小的(comparable),否則報 ClassCastException 異常。它的插入操作 put 方法不會 block,因為它是無界佇列(take 方法在佇列為空的時候會阻塞)。

它的原始碼相對比較簡單,本節將介紹其核心原始碼部分。

我們來看看它有哪些屬性:

點選新增圖片描述(最多60個字)編輯

 

此類實現了 Collection 和 Iterator 介面中的所有介面方法,對其物件進行迭代並遍歷時,不能保證有序性。如果你想要實現有序遍歷,建議採用 Arrays.sort(queue.toArray()) 進行處理。PriorityBlockingQueue 提供了 drainTo 方法用於將部分或全部元素有序地填充(準確說是轉移,會刪除原佇列中的元素)到另一個集合中。還有一個需要說明的是,如果兩個物件的優先順序相同(compare 方法返回 0),此佇列並不保證它們之間的順序。

PriorityBlockingQueue 使用了基於陣列的二叉堆來存放元素,所有的 public 方法採用同一個 lock 進行併發控制。

二叉堆:一顆完全二叉樹,它非常適合用陣列進行儲存,對於陣列中的元素 a[i],其左子節點為 a[2*i+1],其右子節點為 a[2*i + 2],其父節點為 a[(i-1)/2],其堆序性質為,每個節點的值都小於其左右子節點的值。二叉堆中最小的值就是根節點,但是刪除根節點是比較麻煩的,因為需要調整樹。

簡單用個圖解釋一下二叉堆,我就不說太多專業的嚴謹的術語了,這種資料結構的優點是一目瞭然的,最小的元素一定是根元素,它是一棵滿的樹,除了最後一層,最後一層的節點從左到右緊密排列。

點選新增圖片描述(最多60個字)編輯

下面開始 PriorityBlockingQueue 的原始碼分析,首先我們來看看構造方法:

 

點選新增圖片描述(最多60個字)編輯

 

接下來,我們來看看其內部的自動擴容實現:

 

 

點選新增圖片描述(最多60個字)編輯

 

擴容方法對併發的控制也非常的巧妙,釋放了原來的獨佔鎖 lock,這樣的話,擴容操作和讀操作可以同時進行,提高吞吐量。

下面,我們來分析下寫操作 put 方法和讀操作 take 方法。

點選新增圖片描述(最多60個字)編輯

對於二叉堆而言,插入一個節點是簡單的,插入的節點如果比父節點小,交換它們,然後繼續和父節點比較。

 

點選新增圖片描述(最多60個字)編輯

 

我們用圖來示意一下,我們接下來要將 11 插入到佇列中,看看 siftUp 是怎麼操作的。

 

點選新增圖片描述(最多60個字)編輯

 

 

我們再看看 take 方法:

 

點選新增圖片描述(最多60個字)編輯

dequeue 方法返回隊頭,並調整二叉堆的樹,呼叫這個方法必須先獲取獨佔鎖。

廢話不多說,出隊是非常簡單的,因為隊頭就是最小的元素,對應的是陣列的第一個元素。難點是隊頭出隊後,需要調整樹。

點選新增圖片描述(最多60個字)編輯

 

點選新增圖片描述(最多60個字)編輯

 

記住二叉堆是一棵完全二叉樹,那麼根節點 10 拿掉後,最後面的元素 17 必須找到合適的地方放置。首先,17 和 10 不能直接交換,那麼先將根節點 10 的左右子節點中較小的節點往上滑,即 12 往上滑,然後原來 12 留下了一個空節點,然後再把這個空節點的較小的子節點往上滑,即 13 往上滑,最後,留出了位子,17 補上即可。

我稍微調整下這個樹,以便讀者能更明白:

點選新增圖片描述(最多60個字)編輯

好了, PriorityBlockingQueue 我們也說完了。

總結

我知道本文過長,相信一字不漏看完的讀者肯定是少數。

ArrayBlockingQueue 底層是陣列,有界佇列,如果我們要使用生產者-消費者模式,這是非常好的選擇。

LinkedBlockingQueue 底層是連結串列,可以當做無界和有界佇列來使用,所以大家不要以為它就是無界佇列。

SynchronousQueue 本身不帶有空間來儲存任何元素,使用上可以選擇公平模式和非公平模式。

PriorityBlockingQueue 是無界佇列,基於陣列,資料結構為二叉堆,陣列第一個也是樹的根節點總是最小值。