Java併發包下的阻塞佇列
本文簡要介紹一下什麼是阻塞佇列,Java併發包給我們提供的阻塞佇列有哪些,以及怎麼去簡單使用
阻塞佇列 BlockingQueue
1. 簡單概念
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列:支援阻塞的插入和移除:
-
支援阻塞的插入方法:當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列變為不滿
-
支援阻塞的移除方法:當佇列為空時,獲取元素的執行緒會等待佇列變為非空
阻塞佇列的使用場景: 常用於生產者和消費者的場景,生產者是向佇列裡新增元素的執行緒,消費者是從佇列裡取元素的執行緒。阻塞佇列就是生產者用來存放元素、生產者用來獲取元素的容器
2. API介紹
BlockingQueue提供子類實現的幾個API如下:
- add()和remove()
- offer()和poll()
- put()和take()
// 將指定元素插入佇列,成功返回true;不成功返回false
boolean add(E e);
// 從佇列中移出指定元素並返回該元素
boolean remove(Object o);
// 將指定元素插入佇列,在使用有容量限制的佇列時,該方法優於add()
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 獲取佇列的頭元素並將其從佇列中移除,如果佇列為空返回null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 將指定的元素插入佇列中
void put(E e) throws InterruptedException;
// 獲取佇列的頭元素並將其從佇列中移除
E take() throws InterruptedException;
阻塞佇列的方法總結如下:
方法/處理方式 | 丟擲異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除方法 | remove(e) | poll() | take() | poll(time, unit) |
-
丟擲異常:當佇列滿時往佇列裡面插入元素,會丟擲 IllegalStateException 異常;當佇列為空時從佇列裡面獲取元素會丟擲 NoSuchElementException 異常
-
返回特殊值:當往佇列裡插入元素時會返回元素是否插入成功,成功則返回true;移除方法,會獲取佇列的頭元素並將其從佇列中移除,如果佇列為空返回null
-
一直阻塞:當佇列滿時,如果生產者執行緒往佇列裡面put元素,佇列會一直阻塞生產者執行緒,直到佇列可用或者響應中斷退出;當佇列為空時,如果消費者執行緒從佇列裡take元素,則佇列會阻塞住消費者執行緒,直到佇列不為空。
-
超時退出:當阻塞佇列滿時,如果生產者執行緒往佇列裡面插入元素,佇列會阻塞生產者執行緒一段時間,超過指定時間後生產者執行緒將會退出
注意: 上面所說的佇列滿時插入元素的執行緒阻塞是針對有界佇列的,如果是無界佇列,佇列不會出現滿的情況,所以使用put或offer方法永遠不會被阻塞,而且使用offer()方法會永遠返回true。
BlockingQueue的實現類——七大阻塞佇列
下圖是BlockingQueue的繼承體系:
- ArrayBlockingQueue:一個由陣列實現的有界阻塞佇列
- LinkedBlockingQueue:一個由連結串列實現的有界阻塞佇列
- LinkedBlockingDeque:一個由連結串列實現的雙向有界阻塞佇列
- PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列
- SynchronousQueue:一個不儲存元素的阻塞佇列
- LinkedTransferQueue:一個由連結串列實現的無界阻塞佇列
- DelayQueue:一個由優先順序佇列實現的無界阻塞佇列
1. ArrayBlockingQueue
ArrayBlockingQueue是一個由陣列實現的有界阻塞佇列。按照FIFO的原則對元素進行排序。佇列使用可重入鎖(ReentrantLock)對同步資源加鎖,預設情況下不保證執行緒公平的訪問佇列(非公平鎖)。下面是ArrayBlockingQueue原始碼中的幾個成員變數:
// 儲存佇列元素的陣列
final Object[] items;
// 將要取出元素的索引
int takeIndex;
// 將要新增元素的索引
int putIndex;
// 佇列中已新增元素的數量
int count;
// 可重入鎖
final ReentrantLock lock;
// 取元素執行緒等待佇列:佇列非空可獲取
private final Condition notEmpty;
// 插入元素執行緒等待佇列:佇列非滿可插入
private final Condition notFull;
如下是ArrayBlockingQueue的構造器原始碼:
// 傳入一個容量
public ArrayBlockingQueue(int capacity) {
// 預設是非公平的
this(capacity, false);
}
// 可以傳入一個boolean值,true是公平鎖
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
分析ArrayBlockingQueue的put()和take()方法的原始碼:
- put() 方法
// 插入元素
public void put(E e) throws InterruptedException {
// 檢查摻入的元素是否為空,如果為空丟擲NullPointerException異常
checkNotNull(e);
// 獲取可重入鎖
final ReentrantLock lock = this.lock;
// 可響應中斷
lock.lockInterruptibly();
try {
// 如果佇列已滿,則摻入元素的執行緒等待,直到佇列不滿時由獲取元素執行緒喚醒
while (count == items.length)
// 等待
notFull.await();
// 佇列不滿,插入元素
enqueue(e);
} finally {
lock.unlock();
}
}
// 向佇列中插入元素
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 喚醒取元素執行緒,可以取元素了
notEmpty.signal();
}
- take() 方法
// 獲取元素
public E take() throws InterruptedException {
// 獲得鎖
final ReentrantLock lock = this.lock;
// 響應中斷
lock.lockInterruptibly();
try {
// 當佇列為空,獲取元素的執行緒等待,直到佇列不為空時由插入元素執行緒喚醒
while (count == 0)
// 等待
notEmpty.await();
// 返回取到的元素
return dequeue();
} finally {
lock.unlock();
}
}
// 從佇列中獲取元素
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
喚醒等待的插入元素執行緒,可以插入元素了
notFull.signal();
return x;
}
使用ArrayBlockingQueue實現一個簡單的生產消費場景:
public class TestArrayBlockingQueue {
public static void main(String[] args) {
// 容量為3的阻塞佇列
final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
// 兩個生產者執行緒生產資料
for (int i = 0; i < 2; i++) {
new Thread() {
@Override
public void run() {
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " 準備插入資料 " +
(queue.size() == 3 ? "...佇列已滿,正在等待..." : "..."));
queue.put(1);
System.out.println(Thread.currentThread().getName() + " 插入資料," +
"佇列目前有 " + queue.size() + " 個數據.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
// 一個消費者執行緒消費資料
new Thread() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " 準備取出資料 " +
(queue.size() == 0 ? "...佇列已空,正在等待..." : "..."));
queue.take();
System.out.println(Thread.currentThread().getName() + " 取出資料," +
"佇列目前有 " + queue.size() + " 個數據.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
結果如下:
Thread-2 準備取出資料 ...佇列已空,正在等待...
Thread-0 準備插入資料 ...
Thread-1 準備插入資料 ...
Thread-1 插入資料,佇列目前有 2 個數據.
Thread-0 插入資料,佇列目前有 2 個數據.
Thread-2 取出資料,佇列目前有 1 個數據.
Thread-0 準備插入資料 ...
Thread-1 準備插入資料 ...
Thread-0 插入資料,佇列目前有 2 個數據.
Thread-1 插入資料,佇列目前有 3 個數據.
Thread-0 準備插入資料 ...佇列已滿,正在等待...
Thread-1 準備插入資料 ...佇列已滿,正在等待...
2. LinkedBlockingQueue
LinkedBlockingQueue 是一個由連結串列實現的有界阻塞佇列。佇列按照FIFO的原則對元素進行排序,佇列預設的最大長度是:Integer.MAX_VALUE。該佇列也是使用 ReentrantLock 對同步資源加鎖。線上程池中,LinkedBlockingQueue是FixedThreadPool和SingleThreadExecutor的工作佇列。下面是它的構造方法原始碼:
// 預設構造器
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 可設定容量的構造器
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
3. LinkedBlockingDeque
LinkedBlockingDeque 是一個由連結串列結構組成的雙向有界阻塞佇列:可以從佇列的兩端插入和移除元素。雙向佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,減少了一半的競爭。該佇列也是使用 ReentrantLock 對同步資源加鎖。LinkedBlockingDeque 與其他阻塞佇列相比,多了 addFist、addLast、offerFist、offerLast、peekFirst、peekLast等方法:以First結尾的方法表示插入、獲取或移除雙向佇列的第一個元素;以Last結尾的方法表示插入、獲取或移除雙向佇列的最後一個元素。下面是其構造器原始碼:
// 預設構造器
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
// 可設定容量的構造器
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
4. PriorityBlockingQueue
PriorityBlockingQueue 是一個支援優先順序的無界阻塞佇列:預設情況下元素採取升序排列,也可以通過自定義類實現compareTo()方法來制定元素的排序規則,或者在初始化時制定構造引數Comparator來對元素進行排序。該佇列也是使用 ReentrantLock 對同步資源加鎖。下面是其構造器原始碼:
// 預設構造器
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
// 定製化排序
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
5. SynchronousQueue
SynchronousQueue 是一個不儲存元素的阻塞佇列:每一個put操作必須等待一個take操作,否則不能繼續新增元素。該佇列也是使用 ReentrantLock 對同步資源加鎖,預設情況下執行緒採用非公平策略訪問佇列。SynchronousQueue 的身份類似於一箇中轉者,負責把生產者執行緒處理的資料直接傳遞給消費者執行緒,佇列本身並不儲存任何元素,非常適合傳遞性場景。
6. LinkedTransferQueue
LinkedTransferQueue 是一個由連結串列組成的無界阻塞佇列。該佇列也是使用 ReentrantLock 對同步資源加鎖。相比於其他佇列,該佇列多了tryTransfer和transfer方法:
-
transfer方法:如果當前消費者正在等待接收元素,transfer方法可以把生產者傳入的元素立刻傳輸給消費者。如果沒有消費者等待接收元素,transfer方法會將元素存放在佇列的tail節點(尾節點),並等到該元素被消費者消費了才返回。
-
tryTransfer方法:用來試探生產者傳入的元素是否能直接傳遞給消費者,如果沒有消費者等待接收,則返回false。與transfer方法的區別在於:無論消費者是否接收都會立即返回,而transfer方法是必須等到消費者消費了才返回。
7. DelayQueue
DelayQueue 是一個支援支援延時獲取元素的無界阻塞佇列:佇列使用PriorityQueue來實現。該佇列也是使用 ReentrantLock 對同步資源加鎖。佇列中的元素必須實現 Delayed 介面,在建立元素時可以指定多久才能從佇列中獲取當前元素,只有在延時期滿時才能從佇列中提取元素。
DelayQueue的使用場景:
-
快取系統的設計:使用 DelayQueue 儲存快取元素的有效期,使用一個執行緒迴圈查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示快取有效期已到。
-
定時任務排程:使用 DelayQueue 儲存當天將會執行的任務和執行時間,一旦從 DelayQueue 中獲取到任務就開始執行。
參考
《Java併發程式設計的藝術》
</font