JDK原始碼分析-LinkedBlockingQueue
概述
前文「JDK原始碼分析-ArrayBlockingQueue」分析了 ArrayBlockingQueue 的程式碼實現,LinkedBlockingQueue 也是阻塞佇列的實現。與前者不同的是,後者內部是由連結串列實現的。
LinkedBlockingQueue 的繼承結構如下:
下面分析其主要方法的程式碼實現。
程式碼分析
LinkedBlockingQueue 內部有一個巢狀類 Node,它表示連結串列的節點,如下:
static class Node<E> {
E item; // 節點元素
Node<E> next; // 後繼節點
Node(E x) { item = x; }
}
PS: 從 Node 定義可以看出該連結串列是一個單連結串列。
主要成員變數
// 連結串列的容量(若不指定則為 Integer.MAX_VALUE)
private final int capacity;
// 當前元素的數量(即連結串列中元素的數量)
private final AtomicInteger count = new AtomicInteger();
// 連結串列的頭節點(節點元素為空)
transient Node<E> head;
// 連結串列的尾結點(節點元素為空)
private transient Node<E> last;
// take、poll 等出隊操作持有的鎖
private
/** Wait queue for waiting takes */
// 出隊鎖的條件佇列
private final Condition notEmpty = takeLock.newCondition();
// put、offer 等入隊操作的鎖
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
// 入隊鎖的條件佇列
private final Condition notFull = putLock.newCondition();
構造器
LinkedBlockingQueue 有三個構造器,分別如下:
// 構造器 1:無參構造器,初始容量為 Integer.MAX_VALUE,即 2^31-1public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 構造器 2:指定容量的構造器
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化連結串列的頭尾節點
last = head = new Node<E>(null);
// 構造器 3:用給定集合初始化的構造器
public LinkedBlockingQueue(Collection<? extends E> c) {
// 呼叫構造器 2 進行初始化
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended,but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
// 將集合中的元素封裝成 Node 物件,並新增到連結串列末尾
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
enqueue 方法如下:
// 將 node 節點新增到連結串列末尾private void enqueue(Node<E> node) {
last = last.next = node;
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 把 E 封裝成 Node 節點
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
*> * out by lock),and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 若佇列已滿,notFull 等待(類比生產者)
while (count.get() == capacity) {
notFull.await();
// node 入隊
enqueue(node);
c = count.getAndIncrement();
// 若該元素新增後,佇列仍未滿,喚醒一個其他生產者執行緒
if (c + 1 < capacity)
notFull.signal();
// c==0 說明之前佇列為空,出隊執行緒處於等待狀態,
// 新增一個元素後,將出隊執行緒喚醒(消費者)
if (c == 0)
signalNotEmpty();
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
// 喚醒 notEmpty 條件下的一個執行緒(消費者)
notEmpty.signal();
takeLock.unlock();
public boolean offer(E e,long timeout,TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
// 等待超時,返回 false
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
// 入隊
enqueue(new Node<E>(e));
return true;
}
該方法與 put 操作類似,不同的是 put 方法在佇列滿時會一直等待,而該方法有超時時間,超時後返回 false。
3. offer(E):
public boolean offer(E e) { if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 若佇列已滿,直接返回 false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
// 佇列未滿,入隊
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
// 佇列未滿,喚醒 notFull 下的執行緒,繼續入隊
notFull.signal();
} finally {
}
if (c == 0)
return c >= 0;
public E take() throws InterruptedException {
E x;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
// 佇列為空,notEmpty 條件下的執行緒等待(消費者)
while (count.get() == 0) {
notEmpty.await();
// 從佇列頭部刪除節點
x = dequeue();
c = count.getAndDecrement();
// 若佇列不為空,喚醒一個 notEmpty 條件下的執行緒(消費者)
if (c > 1)
notEmpty.signal();
// 佇列已經不滿了,喚醒 notFull 條件下的執行緒(生產者)
if (c == capacity)
signalNotFull();
return x;
private E dequeue() {
Node<E> h = head; // 頭節點
Node<E> first = h.next; // 頭節點的後繼節點
h.next = h; // help GC // 後繼節點指向自己(從連結串列中刪除)
head = first; // 更新頭節點
E x = first.item; // 獲取要刪除節點的資料
first.item = null; // 清空資料(新的頭節點)
private void signalNotFull() {
putLock.lock();
// 喚醒生產者
notFull.signal();
public E poll(long timeout,TimeUnit unit) throws InterruptedException {
E x = null;
final ReentrantLock takeLock = this.takeLock;
// 佇列已空
while (count.get() == 0) {
// 超時返回 null
return null;
nanos = notEmpty.awaitNanos(nanos);
// 若佇列不空,喚醒一個 notEmpty 條件下的執行緒(消費者)
c = count.getAndDecrement();
if (c > 1)
// 佇列不滿,喚醒 notFull 條件下的執行緒(生產者)
if (c == capacity)
public E poll() {
// 佇列為空,返回 null
if (count.get() == 0)
return null;
E x = null;
takeLock.lock();
// 佇列不為空,出隊
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
// 該元素出隊後,佇列仍不為空,喚醒其他消費者
if (c > 1)
notEmpty.signal();
// 佇列已經不滿,喚醒生產者
public E peek() {
if (count.get() == 0)
return null;
// 頭節點的後繼節點
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;