Java多執行緒 之BlockingQueue深入分析
二、BlockingQueue定義的常用方法
1.BlockingQueue定義的常用方法如下:
1)add(anObject):把anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則招聘異常
2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.
3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到
4)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null
5)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的物件被加入為止
其中:BlockingQueue 不接受null 元素。試圖add、put 或offer 一個null 元素時,某些實現會丟擲NullPointerException。null 被用作指示poll 操作失敗的警戒值。
三、BlockingQueue的幾個注意點
【1】BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個remainingCapacity,超出此容量,便無法無阻塞地put 附加元素。沒有任何內部容量約束的BlockingQueue 總是報告Integer.MAX_VALUE 的剩餘容量。
【2】BlockingQueue 實現主要用於生產者-使用者佇列,但它另外還支援Collection
介面。因此,舉例來說,使用remove(x) 從佇列中移除任意一個元素是有可能的。然而,這種操作通常不 會有效執行,只能有計劃地偶爾使用,比如在取消排隊資訊時。
【3】BlockingQueue 實現是執行緒安全的。所有排隊方法都可以使用內部鎖或其他形式的併發控制來自動達到它們的目的。然而,大量的
【4】BlockingQueue 實質上不 支援使用任何一種“close”或“shutdown”操作來指示不再新增任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的end-of-stream 或poison 物件,並根據使用者獲取這些物件的時間來對它們進行解釋。
四、簡要概述BlockingQueue常用的四個實現類
1)ArrayBlockingQueue:規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的.
2)LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的
3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式的Comparator決定的順序.
4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.
其中LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的資料結構不一樣,導致LinkedBlockingQueue的資料吞吐量要大於ArrayBlockingQueue,但線上程數量很大時其效能的可預見性低於ArrayBlockingQueue.
五、具體BlockingQueue的實現類的內部細節
有耐心的同學請看具體實現類細節:
1、ArrayBlockingQueue
ArrayBlockingQueue是一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列的頭部 是在佇列中存在時間最長的元素。佇列的尾部 是在佇列中存在時間最短的元素。新元素插入到佇列的尾部,佇列檢索操作則是從佇列頭部開始獲得元素。
這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致放入操作受阻塞;試圖從空佇列中檢索元素將導致類似阻塞。
ArrayBlockingQueue建立的時候需要指定容量capacity(可以儲存的最大的元素個數,因為它不會自動擴容)以及是否為公平鎖(fair引數)。
在建立ArrayBlockingQueue的時候預設建立的是非公平鎖,不過我們可以在它的建構函式裡指定。這裡呼叫ReentrantLock的建構函式建立鎖的時候,呼叫了:
public ReentrantLock(boolean fair) {
sync = (fair)? new FairSync() : new NonfairSync();
}
FairSync/ NonfairSync是ReentrantLock的內部類:
執行緒按順序請求獲得公平鎖,而一個非公平鎖可以闖入,且當它尚未進入等待佇列,就會和等待佇列head結點的執行緒發生競爭,如果鎖的狀態可用,請求非公平鎖的執行緒可在等待佇列中向前跳躍,獲得該鎖。內部鎖synchronized沒有提供確定的公平性保證。
分三點來講這個類:
2.1 新增新元素的方法:add/put/offer
2.2 該類的幾個例項變數:takeIndex/putIndex/count/
2.3 Condition實現
1.1 新增新元素的方法:add/put/offer
首先,談到新增元素的方法,首先得分析以下該類同步機制中用到的鎖:
Java程式碼
- lock = new ReentrantLock(fair);
- notEmpty = lock.newCondition();//Condition Variable 1
- notFull = lock.newCondition();//Condition Variable 2
這三個都是該類的例項變數,只有一個鎖lock,然後lock例項化出兩個Condition,notEmpty/noFull分別用來協調多執行緒的讀寫操作。
Java程式碼
- publicboolean offer(E e) {
- if (e == null) thrownew NullPointerException();
- final ReentrantLock lock = this.lock;//每個物件對應一個顯示的鎖
- lock.lock();//請求鎖直到獲得鎖(不可以被interrupte)
- try {
- if (count == items.length)//如果佇列已經滿了
- returnfalse;
- else {
- insert(e);
- returntrue;
- }
- } finally {
- lock.unlock();//
- }
- }
- 看insert方法:
- privatevoid insert(E x) {
- items[putIndex] = x;
- //增加全域性index的值。
- /*
- Inc方法體內部:
- final int inc(int i) {
- return (++i == items.length)? 0 : i;
- }
- 這裡可以看出ArrayBlockingQueue採用從前到後向內部陣列插入的方式插入新元素的。如果插完了,putIndex可能重新變為0(在已經執行了移除操作的前提下,否則在之前的判斷中佇列為滿)
- */
- putIndex = inc(putIndex);
- ++count;
- notEmpty.signal();//wake up one waiting thread
- }
Java程式碼
- publicvoid put(E e) throws InterruptedException {
- if (e == null) thrownew NullPointerException();
- final E[] items = this.items;
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();//請求鎖直到得到鎖或者變為interrupted
- try {
- try {
- while (count == items.length)//如果滿了,當前執行緒進入noFull對應的等waiting狀態
- notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- insert(e);
- } finally {
- lock.unlock();
- }
- }
Java程式碼
- publicboolean offer(E e, long timeout, TimeUnit unit)
- throws InterruptedException {
- if (e == null) thrownew NullPointerException();
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- if (count != items.length) {
- insert(e);
- returntrue;
- }
- if (nanos <= 0)
- returnfalse;
- try {
- //如果沒有被 signal/interruptes,需要等待nanos時間才返回
- nanos = notFull.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to non-interrupted thread