【搞定Java併發程式設計】第22篇:Java中的阻塞佇列 BlockingQueue 詳解
上一篇:Java併發容器之ConcurrentHashMap詳解
本文目錄:
2.3、ArrayBlockingQueue的(阻塞)新增的實現原理
2.3.1、add(E e)方法 和 offer(E e)方法
2.4、ArrayBlockingQueue的(阻塞)移除實現原理
3.2、LinkedBlockingQueue的實現原理概論
3.3.1、add(E e) 和 offer(E e) 方法
4、 LinkedBlockingQueue和ArrayBlockingQueue迥異
推薦幾篇不錯的文章:
1、深入剖析java併發之阻塞佇列LinkedBlockingQueue與ArrayBlockingQueue
本文大部分內容轉載自:https://blog.csdn.net/javazejian/article/details/77410889。還有一小部分出自於《Java併發程式設計的藝術》書中。
本文在開篇介紹了Java提供的7種阻塞佇列的基本概念,原始碼分析中只對ArrayBlockingQueue和LinkedBlockingQueue做了分析,如果想了解其他五種阻塞佇列的原始碼分析,可以閱讀這篇文章:
1、阻塞佇列的基本概念
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加操作支援阻塞的插入和移除方法。
1、支援阻塞的插入方法:當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿;
2、支援阻塞的移除方法:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。
阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。
Java中的阻塞佇列介面BlockingQueue繼承自Queue介面,先來看看Queue介面的情況:
public interface Queue<E> extends Collection<E> {
boolean add(E e); // 插入方法
boolean offer(E e); // 插入方法
E remove(); // 刪除方法
E poll(); // 刪除方法
E element(); // 檢查方法
E peek(); // 檢查方法
}
再來看看阻塞佇列介面為我們提供的主要方法:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
void put(E e) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
boolean remove(Object o);
}
總結下上訴方法,可以分為以下三類:
方法\處理方式 | 丟擲異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
1、丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。
2、返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null
3、一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。
4、超時退出:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。
- 插入方法:
add(E e) : 新增成功返回true,失敗拋IllegalStateException異常;
offer(E e) : 成功返回 true,如果此佇列已滿,則返回 false;
put(E e) :將元素插入此佇列的尾部,如果該佇列已滿,則一直阻塞。
- 刪除方法:
remove(Object o) :移除指定元素,成功返回true,失敗返回false;
poll() : 獲取並移除此佇列的頭元素,若佇列為空,則返回 null;
take():獲取並移除此佇列頭元素,若沒有元素則一直阻塞。
- 檢查方法:
element() :獲取但不移除此佇列的頭元素,沒有元素則拋異常;
peek() :獲取但不移除此佇列的頭;若佇列為空,則返回 null。
- 需要注意的幾個點:
BlockingQueue 不接受 null 值的插入,相應的方法在碰到 null 的插入時會丟擲 NullPointerException 異常。null 值在這裡通常用於作為特殊值返回(表格中的第三列),代表 poll 失敗。所以,如果允許插入 null 值的話,那獲取的時候,就不能很好地用 null 來判斷到底是代表失敗,還是獲取的值就是 null 值。
一個 BlockingQueue 可能是有界的,如果在插入的時候,發現佇列滿了,那麼 put 操作將會阻塞。通常,在這裡我們說的無界佇列也不是說真正的無界,而是它的容量是 Integer.MAX_VALUE(21億多)。
BlockingQueue 是設計用來實現生產者-消費者佇列的。當然,你也可以將它當做普通的 Collection 來用,前面說了,它實現了 java.util.Collection 介面。例如,我們可以用 remove(x) 來刪除任意一個元素,但是,這類操作通常並不高效,所以儘量只在少數的場合使用,比如一條訊息已經入隊,但是需要做取消操作的時候。
BlockingQueue 的實現都是執行緒安全的,但是批量的集合操作:如 addAll
、containsAll
、retainAll和
removeAll
不一定是原子操作。如 addAll(c) 有可能在添加了一些元素後中途丟擲異常,此時 BlockingQueue 中已經添加了部分元素,這個是允許的,取決於具體的實現。
BlockingQueue 不支援 close 或 shutdown 等關閉操作,此特性取決於具體的實現,不做強制約束。
最後,BlockingQueue 在生產者-消費者的場景中,是支援多消費者和多生產者的,說的其實就是執行緒安全問題。
-
Java裡的阻塞佇列
JDK7提供了7個阻塞佇列。分別是:
1、ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。
2、LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。
3、PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。
4、DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
5、SynchronousQueue:一個不儲存元素的阻塞佇列。
6、LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
7、LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。
ArrayBlockingQueue是一個用陣列實現的有界阻塞佇列。此佇列按照先進先出(FIFO)的原則對元素進行排序。
預設情況下不保證訪問者公平的訪問佇列,所謂公平訪問佇列是指阻塞的所有生產者執行緒或消費者執行緒,當佇列可用時,可以按照阻塞的先後順序訪問佇列,即先阻塞的生產者執行緒,可以先往佇列裡插入元素,先阻塞的消費者執行緒,可以先從佇列裡獲取元素。通常情況下為了保證公平性會降低吞吐量。
-
LinkedBlockingQueue
LinkedBlockingQueue是一個用連結串列實現的有界阻塞佇列。此佇列的預設和最大長度為Integer.MAX_VALUE。此佇列按照先進先出的原則對元素進行排序。
PriorityBlockingQueue是一個支援優先順序的無界佇列。預設情況下元素採取自然順序排列,也可以通過比較器comparator來指定元素的排序規則。元素按照升序排列。
-
DelayQueue
DelayQueue是一個支援延時獲取元素的無界阻塞佇列。佇列使用PriorityQueue來實現。佇列中的元素必須實現Delayed介面,在建立元素時可以指定多久才能從佇列中獲取當前元素。只有在延遲期滿時才能從佇列中提取元素。我們可以將
DelayQueue運用在以下應用場景:
1、快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。
2、定時任務排程:使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。
-
SynchronousQueue
SynchronousQueue是一個不儲存元素的阻塞佇列。每一個put操作必須等待一個take操作,否則不能繼續新增元素。
SynchronousQueue可以看成是一個傳球手,負責把生產者執行緒處理的資料直接傳遞給消費者執行緒。佇列本身並不儲存任何元素,非常適合傳遞性場景。
SynchronousQueue的吞吐量高於 LinkedBlockingQueue 和 ArrayBlockingQueue。
-
LinkedTransferQueue
LinkedTransferQueue是一個由連結串列結構組成的無界阻塞TransferQueue佇列。相對於其他阻塞佇列,LinkedBlockingQueue多了tryTransfer和transfer方法。
-
LinkedBlockingDeque
LinkedBlockingDeque是一個由連結串列結構組成的雙向阻塞佇列。所謂雙向佇列指的是可以從佇列的兩端插入和移除元素。
雙向佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,也就減少了一半的競爭。
在初始化LinkedBlockingDeque時可以設定容量防止其過度膨脹。另外,雙向阻塞佇列可以運用在“工作竊取”模式中。
2、ArrayBlockingQueue
下面原始碼部分的講解都是基於JDK1.8。
2.1、ArrayBlockingQueue的基本使用
ArrayBlockingQueue 是一個用陣列實現的有界阻塞佇列,其內部按先進先出的原則對元素進行排序,其中put方法和take方法為新增和刪除的阻塞方法,下面我們通過ArrayBlockingQueue佇列實現一個生產者消費者的案例,通過該案例簡單瞭解其使用方式。
package com.zju.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ArrayBlockingQueueDemo {
private final static ArrayBlockingQueue<Apple> queue = new ArrayBlockingQueue<>(1);
public static void main(String[] args) {
new Thread(new Producer(queue)).start();
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
// 產品類:蘋果
class Apple{
public Apple(){
}
}
// 生產者執行緒
class Producer implements Runnable{
private ArrayBlockingQueue<Apple> mAbq;
public Producer(ArrayBlockingQueue<Apple> arrayBlockingQueue) {
this.mAbq = arrayBlockingQueue;
}
@Override
public void run() {
while(true){
produce();
}
}
private void produce(){
try {
Apple apple = new Apple();
mAbq.put(apple);
System.out.println("生產蘋果:" + apple);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 消費者執行緒
class Consumer implements Runnable{
private ArrayBlockingQueue<Apple> mAbq;
public Consumer(ArrayBlockingQueue<Apple> arrayBlockingQueue) {
this.mAbq = arrayBlockingQueue;
}
@Override
public void run() {
while(true){
try {
TimeUnit.MICROSECONDS.sleep(1000);
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void consume() throws InterruptedException{
Apple apple = mAbq.take();
System.out.println("消費蘋果:" + apple);
}
}
程式碼比較簡單, Consumer 消費者和 Producer 生產者,通過ArrayBlockingQueue 佇列獲取和新增元素,其中消費者呼叫了take()方法獲取元素當佇列沒有元素就阻塞,生產者呼叫put()方法新增元素,當佇列滿時就阻塞,通過這種方式便實現生產者消費者模式。比直接使用等待喚醒機制或者Condition條件佇列來得更加簡單。執行程式碼,列印部分Log如下:
有點需要注意的是ArrayBlockingQueue內部的阻塞佇列是通過重入鎖ReenterLock和Condition條件佇列實現的,所以ArrayBlockingQueue中的元素存在公平訪問與非公平訪問的區別。對於公平訪問佇列,被阻塞的執行緒可以按照阻塞的先後順序訪問佇列,即先阻塞的執行緒先訪問佇列。而非公平佇列,當佇列可用時,阻塞的執行緒將進入爭奪訪問資源的競爭中,也就是說誰先搶到誰就執行,沒有固定的先後順序。建立公平與非公平阻塞佇列程式碼如下:
// 預設非公平阻塞佇列
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
// 公平阻塞佇列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);
// 構造方法原始碼
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
除了常用的put、take等方法外,其他方法如下:
// 自動移除此佇列中的所有元素。
void clear()
// 如果此佇列包含指定的元素,則返回 true。
boolean contains(Object o)
// 移除此佇列中所有可用的元素,並將它們新增到給定collection中。
int drainTo(Collection<? super E> c)
// 最多從此佇列中移除給定數量的可用元素,並將這些元素新增到給定collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在此佇列中的元素上按適當順序進行迭代的迭代器。
Iterator<E> iterator()
// 返回佇列還能新增元素的數量
int remainingCapacity()
// 返回此佇列中元素的數量。
int size()
// 返回一個按適當順序包含此佇列中所有元素的陣列。
Object[] toArray()
// 返回一個按適當順序包含此佇列中所有元素的陣列;返回陣列的執行時型別是指定陣列的執行時型別。
<T> T[] toArray(T[] a)
2.2、ArrayBlockingQueue原理概要
BlockingQueue的介面資訊,上面已經表訴的很清楚了,為了後文中更好的理解ArrayBlockingQueue,現在我們來看看它和Queue以及AbstractQueue之間的關係。
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {
protected AbstractQueue() {
}
public boolean add(E e) {
...省略
}
public E remove() {
...省略
}
public E element() {
...省略
}
public void clear() {
...省略
}
public boolean addAll(Collection<? extends E> c) {
...省略
}
}
ArrayBlockingQueue繼承了AbstractQueue、實現了BlockingQueue介面,其內部是通過一個可重入鎖ReentrantLock和兩個Condition條件物件來實現阻塞,這裡先看看其內部成員變數。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
// 儲存資料的陣列
final Object[] items;
// 獲取資料的索引,主要用於take,poll,peek,remove方法
int takeIndex;
// 新增資料的索引,主要用於 put, offer, or add 方法
int putIndex;
// 佇列元素的個數
int count;
// 控制併發訪問的鎖
final ReentrantLock lock;
// notEmpty條件物件,用於通知take方法佇列已有元素,可執行獲取操作
private final Condition notEmpty;
// notFull條件物件,用於通知put方法佇列未滿,可執行新增操作
private final Condition notFull;
// 迭代器
transient Itrs itrs = null;
......
}
從成員變數可看出,ArrayBlockingQueue內部確實是通過陣列物件items來儲存所有的資料,值得注意的是ArrayBlockingQueue通過一個ReentrantLock來同時控制新增執行緒與移除執行緒的併發訪問,這點與LinkedBlockingQueue區別很大(稍後會分析)。而對於notEmpty條件物件則是用於存放等待或喚醒呼叫take方法的執行緒,告訴他們佇列已有元素,可以執行獲取操作。同理notFull條件物件是用於等待或喚醒呼叫put方法的執行緒,告訴它們,佇列未滿,可以執行新增元素的操作。takeIndex代表的是下一個方法(take,poll,peek,remove)被呼叫時獲取陣列元素的索引,putIndex則代表下一個方法(put, offer, or add)被呼叫時元素新增到陣列中的索引。圖示如下 :
2.3、ArrayBlockingQueue的(阻塞)新增的實現原理
2.3.1、add(E e)方法 和 offer(E e)方法
- 第1步:呼叫ArrayBlockingQueue中的add(E e)方法
public boolean add(E e) {
return super.add(e);
}
可以看到add方法實際上呼叫的是ArrayBlockingQueue中的add(E e)方法。
- 第2步:呼叫AbstractQueue中的add(E e)方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
可以發現ArrayBlockingQueue中的add方法又呼叫了其子類ArrayBlockingQueue中的offer(E e)方法。
- 第3步:呼叫ArrayBlockingQueue中的offer(E e)方法
public boolean offer(E e) {
checkNotNull(e); // 檢查元素是否為null
final ReentrantLock lock = this.lock;
lock.lock(); // 加鎖
try {
if (count == items.length) // 判斷佇列是否滿
return false;
else {
enqueue(e); // 新增元素到佇列
return true;
}
} finally {
lock.unlock();
}
}
- 第4步:呼叫ArrayBlockingQueue中的enqueue(E e)方法,入隊操作
private void enqueue(E x) {
// 獲取當前陣列
final Object[] items = this.items;
// 通過putIndex索引對陣列進行賦值
items[putIndex] = x;
// 索引自增,如果已是最後一個位置,重新設定 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++; // 佇列中元素數量加1
// 喚醒呼叫take()方法的執行緒,執行元素獲取操作。
notEmpty.signal();
}
這裡的add方法和offer方法實現比較簡單,其中需要注意的是enqueue(E x)方法,其方法內部通過putIndex索引直接將元素新增到陣列items中。
這裡可能會疑惑的是:當putIndex索引大小等於陣列長度時,需要將putIndex重新設定為0,這是因為當前佇列執行元素獲取時總是從佇列頭部獲取,而新增元素從中從佇列尾部獲取,所以當佇列索引(從0開始)與陣列長度相等時,下次我們就需要從陣列頭部開始添加了,如下圖演示 :
2.3.2、put(E e)方法
put方法是一個阻塞新增方法,即阻塞時可中斷。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 該方法可中斷
try {
// 當佇列元素個數與陣列長度相等時,無法新增元素
while (count == items.length)
// 將當前呼叫執行緒掛起,新增到notFull條件佇列中等待喚醒
notFull.await();
enqueue(e); // 如果佇列沒有滿直接新增。。
} finally {
lock.unlock();
}
}
put方法是一個阻塞的方法,如果佇列元素已滿,那麼當前執行緒將會被notFull條件物件掛起加到等待佇列中,直到佇列有空位才會喚醒新增操作。但如果佇列沒有滿,那麼就直接呼叫enqueue(e)方法將元素加入到陣列佇列中。
到此我們對三個新增方法即put,offer,add都分析完畢,其中offer,add在正常情況下都是無阻塞的新增,而put方法是阻塞新增。這就是阻塞佇列的新增過程。說白了就是當佇列滿時通過條件物件Condtion來阻塞當前呼叫put方法的執行緒,直到執行緒又再次被喚醒執行。總得來說新增執行緒的執行存在以下兩種情況:
1、佇列已滿,那麼新到來的 put 執行緒將新增到 notFull 的條件佇列中等待;
2、有移除執行緒執行移除操作,移除成功同時喚醒 put 執行緒。
如下圖所示 :
2.4、ArrayBlockingQueue的(阻塞)移除實現原理
2.4.1、poll()方法
poll:該方法獲取並移除此佇列的頭元素,若佇列為空,則返回 null 。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 判斷佇列是否為null,不為null執行dequeue()方法,否則返回null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// 刪除佇列頭元素並返回
private E dequeue() {
// 拿到當前陣列的資料
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 獲取要刪除的物件
E x = (E) items[takeIndex];
// 將陣列中takeIndex索引位置設定為null
items[takeIndex] = null;
// takeIndex索引加1並判斷是否與陣列長度相等,
// 如果相等說明已到盡頭,恢復為0
if (++takeIndex == items.length)
takeIndex = 0;
count--; // 佇列個數減1
if (itrs != null)
itrs.elementDequeued(); // 同時更新迭代器中的元素資料
// 刪除了元素說明佇列有空位,喚醒notFull條件物件新增執行緒,執行新增操作
notFull.signal();
return x;
}
poll():獲取並刪除佇列頭元素,佇列沒有資料就返回null,內部通過dequeue()方法刪除頭元素,註釋很清晰,這裡不重複了。
2.4.2、remove(Object o)方法
public boolean remove(Object o) {
if (o == null) return false;
// 獲取陣列資料
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock(); // 加鎖
try {
// 如果此時佇列不為null,這裡是為了防止併發情況
if (count > 0) { // count為佇列中元素的個數
// 獲取下一個要新增元素時的索引
final int putIndex = this.putIndex;
// 獲取當前要被刪除元素的索引
int i = takeIndex;
// 執行迴圈查詢要刪除的元素
do {
// 找到要刪除的元素
if (o.equals(items[i])) {
removeAt(i); // 執行刪除
return true; // 刪除成功返回true
}
// 當前刪除索引執行加1後判斷是否與陣列長度相等
// 若為true,說明索引已到陣列盡頭,將i設定為0
if (++i == items.length)
i = 0;
} while (i != putIndex); // 繼承查詢
}
return false;
} finally {
lock.unlock();
}
}
// 根據索引刪除元素,實際上是把刪除索引之後的元素往前移動一個位置
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// 先判斷要刪除的元素是否為當前佇列頭元素
if (removeIndex == takeIndex) {
// 如果是直接刪除
items[takeIndex] = null;
// 當前佇列頭元素加1並判斷是否與陣列長度相等,若為true設定為0
if (++takeIndex == items.length)
takeIndex = 0;
count--; // 佇列元素減1
if (itrs != null)
itrs.elementDequeued(); // 更新迭代器中的資料
} else {
// 如果要刪除的元素不在佇列頭部,
// 那麼只需迴圈迭代把刪除元素後面的所有元素往前移動一個位置
// 獲取下一個要被新增的元素的索引,作為迴圈判斷結束條件
final int putIndex = this.putIndex;
// 執行迴圈
for (int i = removeIndex;;) {
// 獲取要刪除節點索引的下一個索引
int next = i + 1;
// 判斷是否已為陣列長度,如果是從陣列頭部(索引為0)開始找
if (next == items.length)
next = 0;
// 如果查詢的索引不等於要新增元素的索引,說明元素可以再移動
if (next != putIndex) {
items[i] = items[next]; // 把後一個元素前移覆蓋要刪除的元
i = next;
} else {
// 在removeIndex索引之後的元素都往前移動完畢後清空最後一個元素
items[i] = null;
this.putIndex = i;
break; // 結束迴圈
}
}
count--; // 佇列元素減1
if (itrs != null)
itrs.removedAt(removeIndex); // 更新迭代器資料
}
notFull.signal(); // 喚醒新增執行緒
}
remove(Object o)方法的刪除過程相對複雜些,因為該方法並不是直接從佇列頭部刪除元素。首先執行緒先獲取鎖,再一步判斷佇列 count > 0, 這點是保證併發情況下刪除操作安全執行。接著獲取下一個要新增源的索引 putIndex 以及 takeIndex 索引 ,作為後續迴圈的結束判斷,因為只要 putIndex 與 takeIndex 不相等就說明佇列沒有結束。然後通過while迴圈找到要刪除的元素索引,執行 removeAt(i) 方法刪除。
在 removeAt(i) 方法中實際上做了兩件事,一是首先判斷佇列頭部元素是否為刪除元素,如果是直接刪除,並喚醒新增執行緒;二是如果要刪除的元素並不是佇列頭元素,那麼執行迴圈操作,從要刪除元素的索引removeIndex之後的元素都往前移動一個位置,那麼要刪除的元素就被removeIndex之後的元素替換,從而也就完成了刪除操作。
2.4.3、take()方法
take()方法:是一個阻塞方法,直接獲取佇列頭元素並刪除。
// 從佇列頭部刪除,佇列沒有元素就阻塞,可中斷
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //中斷
try {
// 如果佇列沒有元素
while (count == 0)
// 執行阻塞操作
notEmpty.await(); // 等待notEmpty條件
return dequeue(); // 如果佇列有元素執行刪除操作
} finally {
lock.unlock();
}
}
take 方法其實很簡單,有就刪除,沒有就阻塞。注意這個阻塞是可以中斷的,如果佇列沒有資料那麼就加入notEmpty條件佇列等待(有資料就直接取走,方法結束),如果有新的 put 執行緒添加了資料,那麼 put 操作將會喚醒 take 執行緒,執行 take 操作。圖示如下:
2.4.4、peek()方法
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 直接返回當前佇列的頭元素,但不刪除
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
// 返回佇列陣列下標為i的元素
final E itemAt(int i) {
return (E) items[i];
}
peek方法非常簡單,直接返回當前佇列的頭元素但不刪除任何元素。
ok~,到此對於ArrayBlockingQueue的主要方法就分析完了!
3、LinkedBlockingQueue
3.1、LinkedBlockingQueue的基本概要
LinkedBlockingQueue 是一個由連結串列實現的有界佇列阻塞佇列,但大小預設值為Integer.MAX_VALUE。所以我們在使用 LinkedBlockingQueue 時建議手動傳值,讓其提供我們所需的大小,避免佇列過大造成機器負載或者記憶體爆滿等情況。其建構函式如下:
// 預設大小為Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 建立指定大小為capacity的阻塞佇列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 建立大小預設值為Integer.MAX_VALUE的阻塞佇列並新增c中的元素到阻塞佇列
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
從原始碼看,有三種方式可以構造LinkedBlockingQueue。通常情況下,我們建議建立指定大小的LinkedBlockingQueue阻塞佇列,即上訴程式碼中的第2種。
LinkedBlockingQueue 佇列也是按 FIFO(先進先出)排序元素。佇列的頭部是在佇列中時間最長的元素,佇列的尾部是在佇列中時間最短的元素,新元素插入到佇列的尾部,而佇列執行獲取操作會獲得位於佇列頭部的元素。
在正常情況下,基於連結串列的佇列的吞吐量要高於基於陣列的佇列(ArrayBlockingQueue),因為其內部實現新增和刪除操作使用了兩個ReenterLock來控制併發執行,而ArrayBlockingQueue內部只是使用一個ReenterLock控制併發,因此LinkedBlockingQueue的吞吐量要高於ArrayBlockingQueue。
注意LinkedBlockingQueue和ArrayBlockingQueue的 API 幾乎是一樣的,但它們的內部實現原理不太相同,這點稍後會分析。使用LinkedBlockingQueue,我們同樣也能實現生產者消費者模式。只需把前面ArrayBlockingQueue案例中的阻塞佇列物件換成LinkedBlockingQueue即可。這裡限於篇幅就不貼重複程式碼了。接下來我們重點分析LinkedBlockingQueue的內部實現原理,最後我們將對ArrayBlockingQueue和LinkedBlockingQueue 做總結,闡明它們間的不同之處。
3.2、LinkedBlockingQueue的實現原理概論
LinkedBlockingQueue是一個基於連結串列的阻塞佇列,其內部維持一個基於連結串列的資料佇列,實際上我們對LinkedBlockingQueue的 API 操作都是間接操作該資料佇列,這裡我們先看看LinkedBlockingQueue的內部成員變數。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 節點類,用於儲存資料
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
// 阻塞佇列的大小,預設為Integer.MAX_VALUE
private final int capacity;
// 當前阻塞佇列中的元素個數
private final AtomicInteger count = new AtomicInteger();
// 阻塞佇列的頭結點
transient Node<E> head;
// 阻塞佇列的尾節點
private transient Node<E> last;
// 獲取並移除元素時使用的鎖,如take, poll, etc
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty條件物件,當佇列沒有資料時,用於掛起執行刪除的執行緒
private final Condition notEmpty = takeLock.newCondition();
// 新增元素時使用的鎖如 put, offer, etc
private final ReentrantLock putLock = new ReentrantLock();
// notFull條件物件,當佇列資料已滿時,用於掛起執行新增的執行緒
private final Condition notFull = putLock.newCondition();
}
從上述可看成,每個新增到LinkedBlockingQueue佇列中的資料都將被封裝成Node節點,新增的連結串列佇列中,其中head和last分別指向佇列的頭結點和尾結點。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對併發進行控制。也就是說,新增和刪除操作並不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。
這裡再次強調如果沒有給LinkedBlockingQueue指定容量大小,其預設值將是Integer.MAX_VALUE,如果存在新增速度大於刪除速度時候,有可能會記憶體溢位。這點在使用前希望慎重考慮。至於LinkedBlockingQueue的實現原理圖與ArrayBlockingQueue是類似的,除了對新增和移除方法使用單獨的鎖控制外,兩者都使用了不同的Condition條件物件作為等待佇列,用於掛起take執行緒和put執行緒。
3.3、新增方法的實現原理
對於新增方法,主要指的是add,offer以及put。
3.3.1、add(E e) 和 offer(E e) 方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
從原始碼可以看出,add方法間接呼叫的是offer方法,如果add方法新增失敗將丟擲IllegalStateException異常,新增成功則返回true,那麼下面我們直接看看offer的相關方法實現。
public boolean offer(E e) {
// 新增元素為null直接丟擲異常
if (e == null) throw new NullPointerException();
// 獲取佇列的個數
final AtomicInteger count = this.count;
// 判斷佇列是否已滿
if (count.get() == capacity)
return false;
int c = -1;
// 構建節點
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 再次判斷佇列是否已滿,考慮併發情況
if (count.get() < capacity) {
enqueue(node); // 新增元素
c = count.getAndIncrement(); // 拿到當前未新增新元素時的佇列長度
//如果容量還沒滿
if (c + 1 < capacity)
notFull.signal(); // 喚醒下一個新增執行緒,執行新增操作
}
} finally {
putLock.unlock();
}
// 由於存在新增鎖和消費鎖,而消費鎖和新增鎖都會持續喚醒等待執行緒,因此count肯定會變化。
// 這裡的if條件表示如果佇列中還有1條資料
if (c == 0)
signalNotEmpty(); // 如果還存在資料那麼就喚醒消費鎖
return c >= 0; // 新增成功返回true,否則返回false
}
// 入隊操作
private void enqueue(Node<E> node) {
// 佇列尾節點指向新的node節點
last = last.next = node;
}
// signalNotEmpty方法
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
// 喚醒獲取並刪除元素的執行緒
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
這裡的offer()方法做了兩件事:
第一件事是:判斷佇列是否滿,滿了就直接釋放鎖,沒滿就將節點封裝成Node入隊,然後再次判斷佇列新增完成後是否已滿,不滿就繼續喚醒等到在條件物件notFull上的新增執行緒;
第二件事是:判斷是否需要喚醒等待在notEmpty條件物件上的消費執行緒。
這裡我們可能會有點疑惑,為什麼新增完成後是繼續喚醒在條件物件notFull上的新增執行緒而不是像ArrayBlockingQueue那樣直接喚醒notEmpty條件物件上的消費執行緒?而又為什麼要當if (c == 0)時才去喚醒消費執行緒呢?
喚醒新增執行緒的原因:在新增新元素完成後,會判斷佇列是否已滿,不滿就繼續喚醒在條件物件notFull上的新增執行緒,這點與前面分析的ArrayBlockingQueue很不相同。在ArrayBlockingQueue內部完成新增操作後,會直接喚醒消費執行緒對元素進行獲取,這是因為ArrayBlockingQueue只用了一個ReenterLock同時對新增執行緒和消費執行緒進行控制,這樣如果在新增完成後再次喚醒新增執行緒的話,消費執行緒可能永遠無法執行。而對於LinkedBlockingQueue來說就不一樣了,其內部對新增執行緒和消費執行緒分別使用了各自的ReenterLock鎖對併發進行控制,也就是說新增執行緒和消費執行緒是不會互斥的,所以新增鎖只要管好自己的新增執行緒即可,新增執行緒自己直接喚醒自己的其他新增執行緒,如果沒有等待的新增執行緒,直接結束了。如果有就直到佇列元素已滿才結束掛起,當然offer方法並不會掛起,而是直接結束,只有put方法才會當佇列滿時才執行掛起操作。注意消費執行緒的執行過程也是如此。這也是為什麼LinkedBlockingQueue的吞吐量要相對大些的原因。
為什麼要判斷if (c == 0)時才去喚醒消費執行緒呢?
這是因為消費執行緒一旦被喚醒是一直在消費的(前提是有資料),所以c值是一直在變化的,c值是新增完元素前佇列的大小,此時c只可能是0或c>0。
如果是 c = 0,那麼說明之前消費執行緒已停止,條件物件上可能存在等待的消費執行緒。新增完資料後應該是c+1,那麼有資料就直接喚醒等待消費執行緒,如果沒有就結束啦,等待下一次的消費操作。
如果 c > 0 那麼消費執行緒就不會被喚醒,只能等待下一個消費操作(poll、take、remove)的呼叫。那為什麼不是條件c > 0才去喚醒呢?我們要明白的是消費執行緒一旦被喚醒會和新增執行緒一樣,一直不斷喚醒其他消費執行緒,如果新增前c>0,那麼很可能上一次呼叫的消費執行緒後,資料並沒有被消費完,條件佇列上也就不存在等待的消費執行緒了,所以c>0喚醒消費執行緒得意義不是很大,當然如果新增執行緒一直新增元素,那麼一直c>0,消費執行緒執行的快就要等待下一次呼叫消費操作了(poll、take、remove)。
3.4、移除方法的實現原理
關於移除的方法主要是指remove和poll以及take方法,下面一一分析。
3.4.1、remove方法
public boolean remove(Object o) {
if (o == null) return false;
fullyLock(); // 同時對putLock和takeLock加鎖
try {
// 迴圈查詢要刪除的元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) { // 找到要刪除的節點
unlink(p, trail); // 直接刪除
return true;
}
}
return false;
} finally {
fullyUnlock(); // 解鎖
}
}
// 兩個同時加鎖
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
remove方法刪除指定的物件,這裡我們可能會詫異,為什麼同時對putLock和takeLock加鎖?
這是因為remove方法刪除的資料的位置不確定,為了避免造成並非安全問題,所以需要對2個鎖同時加鎖。
3.4.2、poll方法
public E poll() {
// 獲取當前佇列的大小
final AtomicInteger count = this.count;
if (count.get() == 0) // 如果沒有元素直接返回null
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 判斷佇列是否有資料
if (count.get() > 0) {
// 如果有,直接刪除並獲取該元素值
x = dequeue();
// 當前佇列大小減一
c = count.getAndDecrement();
// 如果佇列未空,繼續喚醒等待在條件物件notEmpty上的消費執行緒
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 判斷c是否等於capacity,這是因為如果滿說明NotFull條件物件上
// 可能存在等待的新增執行緒
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head; // 獲取頭結點
Node<E> first = h.next; // 獲取頭結的下一個節點(要刪除的節點)
h.next = h; // 自己next指向自己,即被刪除
head = first; // 更新頭結點
E x = first.item; // 獲取刪除節點的值
first.item = null; // 清空資料,因為first變成頭結點是不能帶資料的,這樣也就刪除佇列的帶資料的第一個節點
return x;
}
poll方法也比較簡單,如果佇列沒有資料就返回null,如果佇列有資料,那麼就取出來,如果佇列還有資料那麼喚醒等待在條件物件notEmpty上的消費執行緒。然後判斷if (c == capacity)為true就喚醒新增執行緒,這點與前面分析if(c==0)是一樣的道理。因為只有可能佇列滿了,notFull條件物件上才可能存在等待的新增執行緒。
3.4.3、take方法
public E take() throws InterruptedException {
E x;
int c = -1;
// 獲取當前佇列大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可中斷
try {
// 如果佇列沒有資料,掛起當前執行緒到條件物件的等待佇列中
while (count.get() == 0) {
notEmpty.await();
}
// 如果存在資料直接刪除並返回該資料
x = dequeue();
c = count.getAndDecrement(); // 佇列大小減1
if (c > 1)
notEmpty.signal(); // 還有資料就喚醒後續的消費執行緒
} finally {
takeLock.unlock();
}
// 滿足條件,喚醒條件物件上等待佇列中的新增執行緒
if (c == capacity)
signalNotFull();
return x;
}
take方法是一個可阻塞可中斷的移除方法,主要做了兩件事:
一是:如果佇列沒有資料就掛起當前執行緒到 notEmpty 條件物件的等待佇列中一直等待,如果有資料就刪除節點並返回資料項,同時喚醒後續消費執行緒;
二是:嘗試喚醒條件物件 notFull 上等待佇列中的新增執行緒。
到此關於remove、poll、take的實現也分析完了,其中只有take方法具備阻塞功能。remove方法則是成功返回true失敗返回false,poll方法成功返回被移除的值,失敗或沒資料返回null。
下面再看看兩個檢查方法,即peek和element。
3.5、檢查方法的實現原理
兩個檢查方法,即peek和element。
// 構造方法,head 節點不存放資料
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public E element() {
E x = peek(); // 直接呼叫peek
if (x != null)
return x;
else
throw new NoSuchElementException(); // 沒資料拋異常
}
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 獲取頭結節點的下一個節點
Node<E> first = head.next;
if (first == null)
return null; // 為null就返回null
else
return first.item; // 返回值
} finally {
takeLock.unlock();
}
}
從程式碼來看,head頭結節點在初始化時是本身是不帶資料的,僅僅作為頭部head方便我們執行連結串列的相關操作。
peek返回直接獲取頭結點的下一個節點返回其值,如果沒有值就返回null,有值就返回節點對應的值。
element方法內部呼叫的是peek,有資料就返回,沒資料就拋異常。
下面我們最後來看兩個根據時間阻塞的方法,比較有意思,利用的Conditin來實現的。
3.6、時間阻塞的方法
3.6.1、offer(E e, long timeout, TimeUnit unit)
// 在指定時間內阻塞新增的方法,超時就結束
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
// 將時間轉換成納秒
long nanos = unit.toNanos(timeout);
int c = -1;
// 獲取鎖
final ReentrantLock putLock = this.putLock;
// 獲取當前佇列大小
final AtomicInteger count = this.count;
// 鎖中斷(如果需要)
putLock.lockInterruptibly();
try {
// 判斷佇列是否滿
while (count.get() == capacity) {
if (nanos <= 0)
return false;
// 如果佇列滿了,則根據等待時間阻塞等待
nanos = notFull.awaitNanos(nanos);
}
// 佇列沒滿直接入隊
enqueue(new Node<E>(e));
c = count.getAndIncrement();
// 喚醒條件物件上等待的執行緒
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 喚醒消費執行緒
if (c == 0)
signalNotEmpty();
return true;
}
對於這個offer方法,我們重點來看看阻塞的這段程式碼
// 判斷佇列是否滿
while (count.get() == capacity) {
if (nanos <= 0)
return false;
// 如果佇列滿根據阻塞的等待
nanos = notFull.awaitNanos(nanos);
}
// CoditionObject(Codition的實現類)中的awaitNanos方法
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 這裡是將當前新增執行緒封裝成node節點加入Condition的等待佇列中
// 注意這裡的node是AQS的內部類Node
Node node = addConditionWaiter();
// 加入等待,那麼就釋放當前執行緒持有的鎖
int savedState = fullyRelease(node);
// 計算過期時間
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// 主要看這裡!!由於是while 迴圈,這裡會不斷判斷等待時間
// nanosTimeout 是否超時
// static final long spinForTimeoutThreshold = 1000L;