Java 阻塞隊列
轉自《Java並發編程的藝術》
什麽是阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作支持阻塞的插入和移除方法。
- 支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
- 支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變為非空。
阻塞隊列常用於生產者和消費者的場景,生產者是向隊列裏添加元素的線程,消費者是從隊列裏取元素的線程。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。
在阻塞隊列不可用時,這兩個附加操作提供了4種處理方式,詳見下表:
插入和移除操作的4種處理方式
- 拋出異常:當隊列滿時,如果再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常。當隊列空時,從隊列裏獲取元素會拋出NoSuchElementException異常。
- 返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。如果是移除方法,則是從隊列裏取出一個元素,如果沒有則返回null。
- 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出。當隊列空時,如果消費者線程從隊列裏take元素,隊列會阻塞消費者線程,直到隊列不為空。
- 超時退出:當阻塞隊列滿時,如果生產者線程往隊列裏插入元素,隊列會阻塞生產者線程一段時間,如果超過了指定的時間,生產者線程就會退出。
Java裏的阻塞隊列
JDK 7 提供了7個阻塞隊列,如下:
- ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。
- PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
- DelayQueue : 一個使用優先級隊列實現的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
1. ArrayBlockingQueue
ArrayBlockingQueue是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。
默認情況下不保證線程公平的訪問隊列,所謂公平訪問隊列是指阻塞的線程,可以按照阻塞的先後順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對先等待的線程時非公平的,當隊列可用時,阻塞的線程都可以爭奪訪問隊列的資格,有可能先阻塞的線程最後才訪問隊列。為了保證公平性,通常會降低吞吐量。
公平與非公平可以在構建阻塞隊列時進行設置。ArrayBlockingQueue的公平性是通過ReentrantLock實現的。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
LinkedBlockingQueue
LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
PriorityBlockingQueue
PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認情況下元素采取自然順序升序排列。也可以自定義類實現compareTo()方法來指定元素排序規則,或者初始化PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序。需要註意的是不能保證同優先級元素的順序。
DelayQueue
Delayed是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。
DelayQueue非常有用,可以將DelayQueue運用在以下應用場景。
- 緩存系統的設計:使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
- 定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,比如TimerQueue就是使用DelayQueue實現的。
(1). 如何實現Delayed接口
DelayQueue隊列的元素必須實現Delayed接口。我們可以參考ScheduledThreadPoolExecutor裏ScheduledFutureTask類的實現,一共有三步。
SynchronousQueue
SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。
它支持公平訪問隊列。默認情況下線程采用非公平性策略訪問隊列。使用以下構造方法可以創建公平性訪問的SynchronousQueue,如果設置為true,則等待的線程會采用先進先出的順序訪問隊列。
public SynchronousQueue(boolean fair) {
transferer = fair new TransferQueue() : new TransferStack();
}
SynchronousQueue可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列本身並不存儲任何元素,非常適合傳遞性場景。SynchronousQueue的吞吐量高於LinkedBlockingQueue和ArrayBlockingQueue。
LinkedTransferQueue
LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。
(1). transfer方法
如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生產者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點,並等到該元素被消費者消費了才返回。transfer方法的關鍵代碼如下。
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
第一行代碼是試圖把存放當前元素的s節點作為tail節點。第二行代碼是讓CPU自旋等待消費者消費元素。因為自旋會消耗CPU,所以自旋一定的次數後使用Thread.yield()方法來暫停當前正在執行的線程,並執行其他線程。
(2). tryTransfer方法
tryTransfer方法是用來試探生產者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法無論消費者是否接收,方法立即返回,而transfer方法是必須等到消費者消費了才返回。
對於帶有時間限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,試圖把生產者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再返回,如果超時還沒消費元素,則返回false,如果在超時時間內消費了元素,則返回true。
LinkedBlockingDeque
LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的是可以從隊列的兩端插入和移出元素。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First單詞結尾的方法,表示插入、獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、獲取或移除雙端隊列的最後一個元素。另外,插入方法add等同於addLast,移除方法remove等效於removeFirst。但是take方法卻等同於takeFirst,不知道是不是JDK的bug,使用時還是用帶有First和Last後綴的方法更清楚。
在初始化LinkedBlockingDeque時可以設置容量防止其過度膨脹。另外,雙向阻塞隊列可以運用在“工作竊取”模式中。
阻塞隊列的實現原理
如果隊列是空的,消費者會一直等待,當生產者添加元素時,消費者是如何知道當前隊列有元素的呢?如果讓你來設計阻塞隊列你會如何設計,如何讓生產者和消費者進行高效的通信呢?讓我們先來看看JDK是如何實現的。
- 使用通知模式
所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。通過查看JDK源碼發現ArrayBlockingQueue使用了Condition來實現,代碼如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略部分代碼片段
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// 看這裏
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
當往隊列裏插入一個元素時,如果隊列不可用,那麽阻塞生產者主要通過
LockSupport.park(this)來實現。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Java 阻塞隊列