Java多執行緒之BlockingQueue深入分析
一、概述:
BlockingQueue作為執行緒容器,可以為執行緒同步提供有力的保障。
二、BlockingQueue定義的常用方法
1.BlockingQueue定義的常用方法如下:
丟擲異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查 element() peek() 不可用 不可用
1)add(anObject):把anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則招聘異常
2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.
3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.
4)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null
5)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的物件被加入為止
其中:BlockingQueue 不接受null 元素。試圖add、put 或offer 一個null 元素時,某些實現會丟擲NullPointerException。null 被用作指示poll 操作失敗的警戒值。
三、BlockingQueue的幾個注意點
【1】BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個remainingCapacity,超出此容量,便無法無阻塞地put 附加元素。沒有任何內部容量約束的BlockingQueue 總是報告Integer.MAX_VALUE 的剩餘容量。
【2】BlockingQueue 實現主要用於生產者-使用者佇列,但它另外還支援Collection 介面。因此,舉例來說,使用remove(x) 從佇列中移除任意一個元素是有可能的。然而,這種操作通常不 會有效執行,只能有計劃地偶爾使用,比如在取消排隊資訊時。
【3】BlockingQueue 實現是執行緒安全的。所有排隊方法都可以使用內部鎖或其他形式的併發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和removeAll)沒有 必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了c 中的一些元素後,addAll(c) 有可能失敗(丟擲一個異常)。
【4】BlockingQueue 實質上不 支援使用任何一種“close”或“shutdown”操作來指示不再新增任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的end-of-stream 或poison 物件,並根據使用者獲取這些物件的時間來對它們進行解釋。
四、簡要概述BlockingQueue常用的四個實現類
1)ArrayBlockingQueue:規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的.
2)LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的
3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式的Comparator決定的順序.
4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.
其中LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的資料結構不一樣,導致LinkedBlockingQueue的資料吞吐量要大於ArrayBlockingQueue,但線上程數量很大時其效能的可預見性低於ArrayBlockingQueue.
五、具體BlockingQueue的實現類的內部細節
有耐心的同學請看具體實現類細節:
1、ArrayBlockingQueue
ArrayBlockingQueue是一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列的頭部 是在佇列中存在時間最長的元素。佇列的尾部 是在佇列中存在時間最短的元素。新元素插入到佇列的尾部,佇列檢索操作則是從佇列頭部開始獲得元素。
這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致放入操作受阻塞;試圖從空佇列中檢索元素將導致類似阻塞。
ArrayBlockingQueue建立的時候需要指定容量capacity(可以儲存的最大的元素個數,因為它不會自動擴容)以及是否為公平鎖(fair引數)。
在建立ArrayBlockingQueue的時候預設建立的是非公平鎖,不過我們可以在它的建構函式裡指定。這裡呼叫ReentrantLock的建構函式建立鎖的時候,呼叫了:
public ReentrantLock(boolean fair) {
sync = (fair)? new FairSync() : new NonfairSync();
}
FairSync/ NonfairSync是ReentrantLock的內部類:
執行緒按順序請求獲得公平鎖,而一個非公平鎖可以闖入,且當它尚未進入等待佇列,就會和等待佇列head結點的執行緒發生競爭,如果鎖的狀態可用,請求非公平鎖的執行緒可在等待佇列中向前跳躍,獲得該鎖。內部鎖synchronized沒有提供確定的公平性保證。
分三點來講這個類:
2.1 新增新元素的方法:add/put/offer
2.2 該類的幾個例項變數:takeIndex/putIndex/count/
2.3 Condition實現
1.1 新增新元素的方法:add/put/offer
首先,談到新增元素的方法,首先得分析以下該類同步機制中用到的鎖:
Java程式碼
[java]
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();//Condition Variable 1
notFull = lock.newCondition();//Condition Variable 2
這三個都是該類的例項變數,只有一個鎖lock,然後lock例項化出兩個Condition,notEmpty/noFull分別用來協調多執行緒的讀寫操作。
Java程式碼
[java]
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;//每個物件對應一個顯示的鎖
lock.lock();//請求鎖直到獲得鎖(不可以被interrupte)
try {
if (count == items.length)//如果佇列已經滿了
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();//
}
}
看insert方法:
private void insert(E x) {
items[putIndex] = x;
//增加全域性index的值。
/*
Inc方法體內部:
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
這裡可以看出ArrayBlockingQueue採用從前到後向內部陣列插入的方式插入新元素的。如果插完了,putIndex可能重新變為0(在已經執行了移除操作的前提下,否則在之前的判斷中佇列為滿)
*/
putIndex = inc(putIndex);
++count;
notEmpty.signal();//wake up one waiting thread
}
Java程式碼
[java]
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//請求鎖直到得到鎖或者變為interrupted
try {
try {
while (count == items.length)//如果滿了,當前執行緒進入noFull對應的等waiting狀態
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
Java程式碼
[java]
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != items.length) {
insert(e);
return true;
}
if (nanos <= 0)
return false;
try {
//如果沒有被 signal/interruptes,需要等待nanos時間才返回
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
Java程式碼
[java]
public boolean add(E e) {
return super.add(e);
}
父類:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
1.2 該類的幾個例項變數:takeIndex/putIndex/count
Java程式碼
[java]
用三個數字來維護這個佇列中的資料變更:
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;
提取元素的三個方法take/poll/remove內部都呼叫了這個方法:
Java程式碼
[java]
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;//移除已經被提取出的元素
takeIndex = inc(takeIndex);//策略和新增元素時相同
--count;
notFull.signal();//提醒其他在notFull這個Condition上waiting的執行緒可以嘗試工作了
return x;
}
從這個方法裡可見,tabkeIndex維護一個可以提取/移除元素的索引位置,因為takeIndex是從0遞增的,所以這個類是FIFO佇列。
putIndex維護一個可以插入的元素的位置索引。
count顯然是維護佇列中已經存在的元素總數。
1.3 Condition實現
Condition現在的實現只有java.util.concurrent.locks.AbstractQueueSynchoronizer內部的ConditionObject,並且通過ReentranLock的newCondition()方法暴露出來,這是因為Condition的await()/sinal()一般在lock.lock()與lock.unlock()之間執行,當執行condition.await()方法時,它會首先釋放掉本執行緒持有的鎖,然後自己進入等待佇列。直到sinal(),喚醒後又會重新試圖去拿到鎖,拿到後執行await()下的程式碼,其中釋放當前鎖和得到當前鎖都需要ReentranLock的tryAcquire(int arg)方法來判定,並且享受ReentranLock的重進入特性。
Java程式碼
[java]
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//加一個新的condition等待節點
Node node = addConditionWaiter();
//釋放自己的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//如果當前執行緒 等待狀態時CONDITION,park住當前執行緒,等待condition的signal來解除
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
2、SynchronousQueue
一種阻塞佇列,其中每個 put 必須等待一個 take,反之亦然。同步佇列沒有任何內部容量,甚至連一個佇列的容量都沒有。不能在同步佇列上進行 peek,因為僅在試圖要取得元素時,該元素才存在;除非另一個執行緒試圖移除某個元素,否則也不能(使用任何方法)新增元素;也不能迭代佇列,因為其中沒有元素可用於迭代。佇列的頭 是嘗試新增到佇列中的首個已排隊執行緒元素;如果沒有已排隊執行緒,則不新增元素並且頭為 null。對於其他Collection 方法(例如 contains),SynchronousQueue 作為一個空集合。此佇列不允許 null 元素。
同步佇列類似於 CSP 和 Ada 中使用的 rendezvous 通道。它非常適合於傳遞性設計,在這種設計中,在一個執行緒中執行的物件要將某些資訊、事件或任務傳遞給在另一個執行緒中執行的物件,它就必須與該物件同步。
對於正在等待的生產者和使用者執行緒而言,此類支援可選的公平排序策略。預設情況下不保證這種排序。但是,使用公平設定為 true 所構造的佇列可保證執行緒以 FIFO 的順序進行訪問。公平通常會降低吞吐量,但是可以減小可變性並避免得不到服務。
3、LinkedBlockingQueue
一個基於已連結節點的、範圍任意的 blocking queue。此佇列按 FIFO(先進先出)排序元素。佇列的頭部 是在佇列中時間最長的元素。佇列的尾部 是在佇列中時間最短的元素。新元素插入到佇列的尾部,並且佇列檢索操作會獲得位於佇列頭部的元素。連結佇列的吞吐量通常要高於基於陣列的佇列,但是在大多數併發應用程式中,其可預知的效能要低。
單向連結串列結構的佇列。如果不指定容量預設為Integer.MAX_VALUE。通過putLock和takeLock兩個鎖進行同步,兩個鎖分別例項化notFull和notEmpty兩個Condtion,用來協調多執行緒的存取動作。其中某些方法(如remove,toArray,toString,clear等)的同步需要同時獲得這兩個鎖,並且總是先putLock.lock緊接著takeLock.lock(在同一方法fullyLock中),這樣的順序是為了避免可能出現的死鎖情況(我也想不明白為什麼會是這樣?)
4、PriorityBlockingQueue
一個無界的阻塞佇列,它使用與類 PriorityQueue 相同的順序規則,並且提供了阻塞檢索的操作。雖然此佇列邏輯上是無界的,但是由於資源被耗盡,所以試圖執行新增操作可能會失敗(導致 OutOfMemoryError)。此類不允許使用 null 元素。依賴自然順序的優先順序佇列也不允許插入不可比較的物件(因為這樣做會丟擲ClassCastException)。
看它的三個屬性,就基本能看懂這個類了:
Java程式碼
[java]
private final PriorityQueue q;
private final ReentrantLock lock = new ReentrantLock(true);
private final Condition notEmpty = lock.newCondition();
lock說明本類使用一個lock來同步讀寫等操作。
notEmpty協調佇列是否有新元素提供,而佇列滿了以後會呼叫PriorityQueue的grow方法來擴容。
5、DelayQueue
Delayed 元素的一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部 是延遲期滿後儲存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且 poll 將返回 null。當一個元素的getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於或等於零的值時,則出現期滿。此佇列不允許使用 null 元素。
Delayed介面繼承自Comparable,我們插入的E元素都要實現這個介面。
DelayQueue的設計目的間API文件:
An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will returnnull. Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.
因為DelayQueue構造函數了裡限定死不允許傳入comparator(之前的PriorityBlockingQueue中沒有限定死),即只能在compare方法裡定義優先順序的比較規則。再看上面這段英文,“The head of the queue is that Delayed element whose delay expired furthest in the past.”說明compare方法實現的時候要保證最先加入的元素最早結束延時。而 “Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.”說明getDelay方法的實現必須保證延時到了返回的值變為<=0的int。
上面這段英文中,還說明了:在poll/take的時候,佇列中元素會判定這個elment有沒有達到超時時間,如果沒有達到,poll返回null,而take進入等待狀態。但是,除了這兩個方法,佇列中的元素會被當做正常的元素來對待。例如,size方法返回所有元素的數量,而不管它們有沒有達到超時時間。而協調的Condition available只對take和poll是有意義的。
另外需要補充的是,在ScheduledThreadPoolExecutor中工作佇列型別是它的內部類DelayedWorkQueue,而DelayedWorkQueue的Task容器是DelayQueue型別,而ScheduledFutureTask作為Delay的實現類作為Runnable的封裝後的Task類。也就是說ScheduledThreadPoolExecutor是通過DelayQueue優先順序判定規則來執行任務的。
6、BlockingDque+LinkedBlockingQueue
BlockingDque為阻塞雙端佇列介面,實現類有LinkedBlockingDque。雙端佇列特別之處是它首尾都可以操作。LinkedBlockingDque不同於LinkedBlockingQueue,它只用一個lock來維護讀寫操作,並由這個lock例項化出兩個Condition notEmpty及notFull,而LinkedBlockingQueue讀和寫分別維護一個lock。
相關推薦
Java多執行緒之BlockingQueue深入分析
一、概述: BlockingQueue作為執行緒容器,可以為執行緒同步提供有力的保障。 二、BlockingQueue定義的常用方法 1.BlockingQueue定義的常用方法如下: 丟擲異常 特殊值 阻塞 超時 插入 add(e) offer(e) put(e) offer(e, time, un
Java多執行緒 之BlockingQueue深入分析
二、BlockingQueue定義的常用方法 1.BlockingQueue定義的常用方法如下: 1)add(anObject):把anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則招聘異常 2)off
Java多執行緒之執行緒池深入分析
執行緒池是併發包裡面很重要的一部分,在實際情況中也是使用很多的一個重要元件。 下圖描述的是執行緒池API的一部分。廣義上的完整執行緒池可能還包括Thread/Runnable、Timer/TimerTask等部分。這裡只介紹主要的和高階的API以及架構和原理。 大
Java多執行緒之AQS(AbstractQueuedSynchronizer )實現原理和原始碼分析(三)
章節概覽、 1、回顧 上一章節,我們分析了ReentrantLock的原始碼: 2、AQS 佇列同步器概述 本章節我們深入分析下AQS(AbstractQueuedSynchronizer)佇列同步器原始碼,AQS是用來構建鎖或者其他同步元件的基礎框架。
Java多執行緒之Condition實現原理和原始碼分析(四)
章節概覽、 1、概述 上面的幾個章節我們基於lock(),unlock()方法為入口,深入分析了獨佔鎖的獲取和釋放。這個章節我們在此基礎上,進一步分析AQS是如何實現await,signal功能。其功能上和synchronize的wait,notify一樣。
java多執行緒之併發集合(BlockingQueue)
簡介 實現 package com.np.ota.test.queue; import java.util.concurrent.BlockingQueue; import java.ut
Java多執行緒之ThreadPoolExecutor實現原理和原始碼分析(五)
章節概覽、 1、概述 執行緒池的顧名思義,就是執行緒的一個集合。需要用到執行緒,從集合裡面取出即可。這樣設計主要的作用是優化執行緒的建立和銷燬而造成的資源浪費的情況。Java中的執行緒池的實現主要是JUC下面的ThreadPoolExecutor類完成的。下面
Java多執行緒之ReentrantLock實現原理和原始碼分析(二)
章節概覽、 1、ReentrantLock概述 ReentrantLock字面含義是可重入的互斥鎖,實現了和synchronize關鍵字一樣的獨佔鎖功能。但是ReentrantLock使用的是自旋鎖,通過CAS硬體原語指令實現的輕量級的鎖,不會引起上下文切換
Java多執行緒之深入解析ThreadLocal和ThreadLocalMap
ThreadLocal概述 ThreadLocal是執行緒變數,ThreadLocal中填充的變數屬於當前執行緒,該變數對其他執行緒而言是隔離的。ThreadLocal為變數在每個執行緒中都建立了一個副本,那麼每個執行緒可以訪問自己內部的副本變數。 它具有3個特性: 執行緒併發:在多執行緒併發場景下使用。
Java多執行緒之join()方法
概要 本章,會對Thread中join()方法進行介紹。涉及到的內容包括: 1. join()介紹 2. join()原始碼分析(基於JDK1.7.0_40) 3. join()示例 來源:http://www.cnblogs.com/skywang12345/p/34792
白話理解java多執行緒之join()方法
join字面意思是加入,我理解為插隊. 舉例:媽媽在炒菜,發現沒喲醬油了,讓兒子去打醬油,兒子打完醬油,媽媽炒完菜,全家一起吃 package cn.yh.thread01; /** * * 打醬油的例子 */ public class Demo03 { public stat
細說Java 多執行緒之記憶體可見性
前言: 討論學習Java中的記憶體可見性、Java記憶體模型、指令重排序、as-if-serial語義等多執行緒中偏向底層的一些知識,以及synchronized和volatile實現記憶體可見性的原理和方法。 1、可見性介紹 可見性:一個執行緒對共用變數值的修改,能夠及時地被其他執行緒
Java 多執行緒分段下載原理分析和實現
多執行緒下載介紹 多執行緒下載技術是很常見的一種下載方案,這種方式充分利用了多執行緒的優勢,在同一時間段內通過多個執行緒發起下載請求,將需要下載的資料分割成多個部分,每一個執行緒只負責下載其中一個部分,然後將下載後的資料組裝成完整的資料檔案,這樣便大大加快了下載效率。常見的下載器,迅
java多執行緒之 執行緒協作
也是網上看的一道題目:關於假如有Thread1、Thread2、Thread3、Thread4四條執行緒分別統計C、D、E、F四個盤的大小,所有執行緒都統計完畢交給Thread5執行緒去做彙總,應當如何實現? 蒐集整理了網上朋友提供的方法,主要有: 1. 多執行緒都是Thread或
java多執行緒之鎖機制二
網上看到一個題目,題目是這樣:Java多執行緒,啟動四個執行緒,兩個執行加一,另外兩個執行減一。 針對該問題寫了一個程式,測試通過,如下: class Sync { static int count = 0; public void add() {
java多執行緒之鎖機制一
網上看了一篇關於java synchronized關鍵字使用的很好的文章,現將其簡要總結一下,加深理解。 先總結兩個規則: synchronized鎖住的是括號裡的物件,而不是程式碼。對於非static的synchronized方法,鎖的就是物件本身也就是this。 多個執行緒
java多執行緒之Phaser
java多執行緒技術提供了Phaser工具類,Phaser表示“階段器”,用來解決控制多個執行緒分階段共同完成任務的情景問題。其作用相比CountDownLatch和CyclicBarrier更加靈活,例如有這樣的一個題目:5個學生一起參加考試,一共有三道題,要求所有學生到齊才能開始考試,全部同學都
Java多執行緒之——ThreadLocal
ThreadLocal是什麼:每一個ThreadLocal能夠放一個執行緒級別的變數,也就是說,每一個執行緒有獨自的變數,互不干擾。以此達到執行緒安全的目的,並且一定會安全。 實現原理: 要了解實現原理,我們先看set方法 public void set(T value) { T
java多執行緒之Lock--顯式鎖
Lock與Synchronized簡介 Synchornized相信大家用的已經比較熟悉了,這裡我就不介紹它的用法了 Synchronized被稱為同步鎖或者是隱式鎖,隱式鎖與顯式鎖區別在於,隱式鎖的獲取和釋放都需要出現在一個塊結構中,而且是有順序的,獲取鎖的順序和釋放鎖的順序必須相反,就是說,
Java多執行緒之Executor框架
在前面的這篇文章中介紹了執行緒池的相關知識,現在我們來看一下跟執行緒池相關的框架--Executor。 一.什麼是Executor 1.Executor框架的兩級排程模型 在HotSpot VM的執行緒模型中,Java執行緒(java.lang.Thread)被一對一對映為本地作業系統執