Java併發程式設計札記-(五)JUC容器-05ArrayBlockingQueue與LinkedBlockingQueue
今天來學習ArrayBlockingQueue與LinkedBlockingQueue。
ArrayBlockingQueue是一個基於陣列的有界阻塞佇列。“有界”表示陣列容量是固定的。這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。試圖向已滿佇列中放入元素會導致操作受阻塞;試圖從空佇列中提取元素將導致類似阻塞。
屬性
/** 佇列中的資料 */
final Object[] items;
/** 下個要刪除的項的索引(take, poll, peek ,remove方法使用) */
int takeIndex;
/** 下個插入的位置(put, offer, add方法使用) */
int putIndex;
/** 佇列中元素的數量 */
int count;
以上是與佇列相關的屬性。下面是與併發控制相關的屬性。
/** 鎖 */
final ReentrantLock lock;
/** 不空的condition */
private final Condition notEmpty;
/** 不滿的condition */
private final Condition notFull;
核心方法
offer(E e)
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this .lock;
//獲取獨佔鎖
lock.lock();
try {
// 如果佇列已滿,則返回false。
if (count == items.length)
return false;
else {
// 如果佇列未滿,則插入e,並返回true。
enqueue(e);
return true;
}
} finally {
//釋放鎖
lock.unlock();
}
}
enqueue(E x)方法原始碼如下:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//將x新增到佇列中
items[putIndex] = x;
//如果佇列已滿,則將[下一個被新增元素的索引]置為0
if (++putIndex == items.length)
putIndex = 0;
count++;
//喚醒notEmpty上的等待執行緒
notEmpty.signal();
}
將元素新增到佇列之前,必須先獲得獨佔鎖。加鎖後,若發現佇列已滿,返回false。(為什麼這裡沒有呼叫notFull.await()方法?)將元素插入到佇列後,呼叫notEmpty.signal()喚醒notEmpty上的等待執行緒。
take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//獲取鎖,若當前執行緒是中斷狀態,則丟擲InterruptedException異常
lock.lockInterruptibly();
try {
//如果佇列為空,則一直等待。
while (count == 0)
notEmpty.await();
//取出元素並返回
return dequeue();
} finally {
//釋放鎖
lock.unlock();
}
}
dequeue()方法如下
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//喚醒在notFull上等待的執行緒
notFull.signal();
return x;
}
將元素添從佇列中移除之前,必須先獲得獨佔鎖。加鎖後,若發現佇列為空,呼叫notEmpty.await(),使執行緒在notEmpty上等待。如果佇列不為空,將元素添從佇列中移除,然後呼叫notFull.signal()喚醒notFull的等待執行緒。
LinkedBlockingQueue是一個基於單向連結串列的、可指定大小的阻塞佇列。
可選的容量範圍構造方法引數作為防止佇列過度擴充套件的一種方法。如果未指定容量,則它等於 Integer.MAX_VALUE。除非插入節點會使佇列超出容量,否則每次插入後會動態地建立連結節點。
節點類
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
屬性
/** 容量。初始化LinkedBlockingQueue時需要指定,如果不指定則預設為Integer.MAX_VALUE if none */
private final int capacity;
/** 連結串列的實際大小 */
private final AtomicInteger count = new AtomicInteger();
/**
* 連結串列的頭結點
* 以下表達式一直成立: head.item == null
*/
transient Node<E> head;
/**
* 連結串列的尾節點
* 以下表達式一直成立: last.next == null
*/
private transient Node<E> last;
/** 獲取操作的鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 獲取操作的等待佇列condition */
private final Condition notEmpty = takeLock.newCondition();
/** 插入操作的鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** 插入操作的等待佇列condition */
private final Condition notFull = putLock.newCondition();
以上屬性透漏出一個重要資訊:插入和獲取操作使用了不同的鎖。
offer(E e)
/**
* 將指定元素插入到此佇列的尾部(如果立即可行且不會超出此佇列的容量)
* 在成功時返回 true,如果此佇列已滿,則返回 false。
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果當前沒有空間可用,則返回 false。
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//加鎖
putLock.lock();
try {
//再次判斷佇列是否已滿
if (count.get() < capacity) {
//插入元素到佇列
enqueue(node);
c = count.getAndIncrement();
//如果插入元素後,元素仍未滿,喚醒在notFull上的等待執行緒
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//釋放所
putLock.unlock();
}
//如果在插入節點前,佇列為空;則插入節點後,喚醒notEmpty上的等待執行緒
if (c == 0)
signalNotEmpty();
return c >= 0;
}
enqueue(Node)原始碼如下
private void enqueue(Node<E> node) {
last = last.next = node;
}
last = last.next = node;
執行後last.next的值為null?
signalNotEmpty()原始碼如下
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
注意notEmpty是與takeLock相關聯的,必須先獲取takeLock鎖,再呼叫notEmpty.signal()。
take()
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//獲取鎖,若當前執行緒是中斷狀態,則丟擲InterruptedException異常
takeLock.lockInterruptibly();
try {
//若佇列為空,則一直等待。
while (count.get() == 0) {
notEmpty.await();
}
//取出元素
x = dequeue();
//取出元素之後,返回原始的節點數量,然後節點數量-1。
c = count.getAndDecrement();
//如果取出元素後,佇列不為空,則喚醒在notEmpty上的等待執行緒
if (c > 1)
notEmpty.signal();
} finally {
//釋放鎖
takeLock.unlock();
}
//如果在獲取節點之前佇列為滿;則取出節點後,喚醒notFull上的等待執行緒
if (c == capacity)
signalNotFull();
return x;
}
dequeue()原始碼如下:
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
本文就講到這裡,想了解Java併發程式設計更多內容請參考: