併發容器學習—LinkedBlockingQueue和LinkedBlockingDueue
//從這可知,LinkedBlockingQueue是個單鏈表
static class Node<E> {
E item; //資料
Node<E> next; //後繼結點
Node(E x) { item = x; }
}
2.LinkedBlockingQueue的繼承體系
LinkedBlockingQueue的繼承關係如下圖所示,由繼承關係可知LinkedBlockingQueue與ArrayBlockingQueue實現的功能是相同的,只在儲存資料結構上不同。其父類及實現的介面在之前的學習中都已分析過,這裡不在多說。
3.重要的屬性以及構造方法
在LinkedBlockingQueue中保證執行緒併發安全所使用的的方式與ArrayBlockingQueue相似,都是通過使用重入鎖ReentrantLock來實現的,不同的是ArrayBlockingQueue不論出隊還是入隊使用的都是同一把鎖,因此ArrayBlockingQueue在實際使用使是不能出入隊併發執行的,而LinkedBlockingQueue在這邊方面則不同,LinkedBlockingQueue的出入隊是各一把鎖,分別控制出入隊操作。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //阻塞佇列的容量,不能超過int型別的最大值 private final int capacity; //佇列中元素的數量,是個原子計數型別 private final AtomicInteger count = new AtomicInteger(); //底層連結串列的頭結點,即隊首 transient Node<E> head; //底層連結串列的尾結點,即隊尾 private transient Node<E> last; //重入鎖,用於出隊操作 private final ReentrantLock takeLock = new ReentrantLock(); //佇列允許出隊條件 private final Condition notEmpty = takeLock.newCondition(); //重入鎖,用於入地操作 private final ReentrantLock putLock = new ReentrantLock(); //佇列允許入隊條件 private final Condition notFull = putLock.newCondition(); //預設構造方法,佇列容量為最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //指定容量的構造方法 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); //初始化連結串列 } //擁有初始資料的阻塞佇列 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } } }
4.入隊過程
與ArrayBlockingQueue相同,LinkedBlockingQueue中的入隊方法也是put、add和offer三種,不過相比ArrayBlockingQueue中方法,它們更簡單一些。
//父類AbstractQueue中的新增方法,新增失敗就拋異常
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//入隊方法,容量不足就放棄入隊
public boolean offer(E e) {
//從這可以知道,LinkedBlockingQueue中也是不允許null元素的
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count; //獲取元素計數器
//判斷佇列是否已滿,已滿不允許在新增元素到佇列中
if (count.get() == capacity)
return false;
int c = -1; //用於記錄佇列中原有元素的數量
Node<E> node = new Node<E>(e); //新建e元素接地
final ReentrantLock putLock = this.putLock;
putLock.lock(); //入隊鎖
try {
//獲取鎖後再次判斷佇列容量是否已滿,已滿放棄入隊
if (count.get() < capacity) {
enqueue(node); //真正執行入隊的方法
c = count.getAndIncrement(); //元素計數+1,並且將原來元素的個數返回
//判斷入隊後,佇列是否還有剩餘容量,若有就喚醒其他某個入隊操作執行緒
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock(); //釋放鎖
}
//判斷佇列原來是不是空佇列,即未新增元素之前是不是為空
//若為空,那麼出隊操作此時應該都在等待狀態,需要喚醒某個出隊操作的執行緒
if (c == 0)
signalNotEmpty(); //喚醒一個出隊操作執行緒
return c >= 0;
}
//向佇列中新增隊尾元素
private void enqueue(Node<E> node) {
last = last.next = node; //直接新增元素到連結串列尾結點
}
//喚醒出隊操作的執行緒
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock; //獲取出隊鎖
takeLock.lock();
try {
notEmpty.signal(); //隨機喚醒某個出隊操作執行緒
} finally {
takeLock.unlock();
}
}
//在一定時間內執行入隊操作,若超過指定時間仍無法入隊,那就放棄入隊,可被中斷
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//從這可以知道,LinkedBlockingQueue中也是不允許null元素的
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout); //換算超時時間單位
int c = -1;
final ReentrantLock putLock = this.putLock; //獲取入隊鎖
final AtomicInteger count = this.count; //獲取計數器
putLock.lockInterruptibly(); //可被中斷的加鎖
try {
//判斷佇列容量是否已滿,使用迴圈是為了防止虛假喚醒
//佇列若是已滿,則等待一定時間在嘗試入隊
while (count.get() == capacity) {
if (nanos <= 0) //判斷等待時間是否還有剩餘
return false; //超時,返回入隊失敗
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
//向佇列中新增元素,若佇列容量不足,則等待直到佇列有空間後繼續新增
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock; //獲取入隊鎖
final AtomicInteger count = this.count; //計數器
putLock.lockInterruptibly();
try {
//判斷佇列是否已滿,佇列已滿則,入隊操作執行緒進入等待狀態
//喚醒後要再次判斷佇列是否有空間,防止虛假喚醒
while (count.get() == capacity) {
//佇列已滿,執行緒進入條件佇列等待
notFull.await();
}
enqueue(node); //入隊
c = count.getAndIncrement(); //獲取原數量,並增加佇列中的元素數量
if (c + 1 < capacity)
//到此,說明佇列中至少有一個元素,那麼就能進行出隊操作
//因此可以喚醒一個等待出隊的執行緒執行出隊操作
notFull.signal();
} finally {
putLock.unlock();
}
/判斷佇列原來是不是空佇列,即未新增元素之前是不是為空
//若為空,那麼出隊操作此時應該都在等待狀態,需要喚醒某個出隊操作的執行緒
if (c == 0)
signalNotEmpty();
}
5.出隊過程
在LinkedBlockingQueue中的出隊方法也是隻有兩種poll和take,一個不等待,一個等待。
//移除並返回隊首元素,若佇列為空,則返回null
public E poll() {
final AtomicInteger count = this.count; //獲取佇列元素個數
if (count.get() == 0) //判斷是不是空佇列,空佇列直接返回null
return null;
E x = null; //用於記錄移除出隊的結點
int c = -1; //用於記錄佇列原來結點數量
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//判斷佇列是否為空,不為空才能移除並獲取隊首結點
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement(); //記錄舊計數,並結點更新計數
//出隊後,佇列仍不為空,那麼久可以繼續喚醒一個出隊操作的執行緒
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//在一定時間內嘗試出隊操作,若超時仍未成功,則返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0) //判斷是否超時
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//移除並返回隊首元素,若佇列為空,則等待
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//判斷佇列是否是空佇列,若是空佇列那麼當前出隊操作進入等待狀態
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
//判斷佇列是否為空,佇列不空,則可以繼續喚醒其他的出隊操作執行緒
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//判斷未進行本次出隊操作前佇列是否已滿,佇列若是已滿,說明所有的
//入隊操作要麼處於等待狀態,要麼不能成功,而現在至少佇列執行過一次
//出隊操作,此時佇列必然還有容量可以執行入隊操作,因此可以喚醒任意一個
//執行入隊操作的執行緒
if (c == capacity)
signalNotFull();
return x;
}
6.peek方法
//獲取但不移除隊首元素
public E peek() {
//佇列中沒有元素的話,就返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next; //head.next結點就是隊首元素對應的結點
//判斷隊首是否為null,為null說明佇列是空佇列
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
7.remove方法
//遍歷佇列查詢o元素對應的結點並將其從佇列中移除
public boolean remove(Object o) {
if (o == null) return false;
fullyLock(); //獲取出入隊鎖,防止併發問題
try {
//遍歷佇列對應的連結串列,查詢要移除的元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//判斷結點是否是要刪除的結點,若是,那就要將該結點從
//連結串列中移除
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock(); //將出入隊鎖都釋放
}
}
//刪除佇列中元素時,要對其他的出入隊操作進行同步
//因此刪除操作要將出隊鎖和入隊鎖都獲取到
void fullyLock() {
putLock.lock();
takeLock.lock();
}
//將結點p從連結串列中移除
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next; //trail的後繼結點變為p的後繼結點
//若是p是尾結點,那麼將last變為trail(trail是p的前驅結點)
if (last == p)
last = trail;
//移除一個結點,那麼結點計數要-1,並且可以喚醒入隊操作的執行緒了
if (count.getAndDecrement() == capacity)
notFull.signal();
}
8.size的統計
public int size() {
return count.get(); //佇列中的元素個數直接返回計數器值
}
二、LinkedBlockingDueue併發容器
1.LinkedBlockingDueue的底層實現
LinkedBlockingDueue可以看做是LinkedBlockingQueue的升級版,LinkedBlockingQueue能做的LinkedBlockingDueue也能做,不能做的LinkedBlockingDueue還能做,其底層資料結構也是連結串列,不過與LinkedBlockingQueue的單鏈表不同,LinkedBlockingDueue是雙向連結串列,並且還可以做堆疊使用。
結點的定義如下:
static final class Node<E> {
//儲存資料
E item;
//前驅結點
Node<E> prev;
//後繼結點
Node<E> next;
Node(E x) {
item = x;
}
}
2.LinkedBlockingDueue的繼承關係
LinkedBlockingDueue的繼承關係如下所示,相比LinkedBlockingQueue多實現了一個雙端佇列的介面。
接下來看看BlockingDeque中定義了哪些方法:
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
//雙端佇列中若還有容量,將元素新增到隊首,否則丟擲異常
void addFirst(E e);
//雙端佇列中若還有容量,將元素新增到隊尾,否則丟擲異常
void addLast(E e);
//雙端佇列中若還有容量,將元素新增到隊首,否則返回false
boolean offerFirst(E e);
//雙端佇列中若還有容量,將元素新增到隊尾,否則返回false
boolean offerLast(E e);
//雙端佇列中若還有容量,立即將元素新增到隊首,否則等待容量有空閒在新增
void putFirst(E e) throws InterruptedException;
//雙端佇列中若還有容量,立即將元素新增到隊尾,否則等待容量有空閒在新增
void putLast(E e) throws InterruptedException;
//在一定時間內嘗試將元素新增到隊首,若到指定時間還沒新增成功,則返回false
boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//在一定時間內嘗試將元素新增到隊尾,若到指定時間還沒新增成功,則返回false
boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//若佇列不空,立即獲取並移除隊首元素,若佇列已空則等待到佇列中有元素在執行
E takeFirst() throws InterruptedException;
//若佇列不空,立即獲取並移除隊尾元素,若佇列已空則等待到佇列中有元素在執行
E takeLast() throws InterruptedException;
//在一定時間內嘗試獲取並移除隊首元素,若到達指定時間佇列中仍沒有元素,直接放棄嘗試,返回null
E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException;
//在一定時間內嘗試獲取並移除隊尾元素,若到達指定時間佇列中仍沒有元素,直接放棄嘗試,返回null
E pollLast(long timeout, TimeUnit unit)
throws InterruptedException;
//移除佇列中第一次出現的o元素
boolean removeFirstOccurrence(Object o);
//移除佇列中最後一個出現的o元素
boolean removeLastOccurrence(Object o);
//雙端佇列中若還有容量,將元素新增到隊尾,否則丟擲異常
//該方法等同於addLast
boolean add(E e);
//雙端佇列中若還有容量,將元素新增到隊尾,否則返回false
//該方法等同於offerLast
boolean offer(E e);
//若佇列不滿,則立即向隊尾新增元素,否則等待佇列有空間後在新增
void put(E e) throws InterruptedException;
//在一定時間內嘗試將元素新增到隊尾,若到指定時間還沒新增成功,則返回false
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//獲取並移除隊首,若佇列為空,則拋異常
E remove();
//獲取並移除隊首,若佇列為空,則返回null
E poll();
//獲取並移除隊首,若佇列為空,則等待佇列不為空在執行
E take() throws InterruptedException;
//在一定時間內嘗試獲取並移除隊尾元素,若到達指定時間佇列中仍沒有元素,直接放棄嘗試,返回null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//獲取但不移除隊首元素,若佇列為空,那麼拋異常
E element();
//獲取但不移除隊首元素,若佇列為空,那麼返回null
E peek();
//刪除隊裡中第一次出現的o元素
boolean remove(Object o);
//判斷佇列中是否含有o元素
public boolean contains(Object o);
//佇列中元素的數量
public int size();
//獲取佇列的迭代器
Iterator<E> iterator();
//向佇列中壓入一個元素,即向隊首新增一個元素,若佇列沒有
//容量,則丟擲異常,等同於addFirst
void push(E e);
}
3.重要屬性及構造方法
LinkedBlockingDueue也是個容量可選(最大為Integer.MAX_VALUE)的阻塞佇列,且執行緒安全。與LinkedBlockingQueue相似,其執行緒安全也是通過ReentrantLock來實現的,不過略微不同的似,LinkedBlockingDueue的底層只有一個重入鎖,而LinkedBlockingQueue則有兩個。
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
//佇列的頭結點
transient Node<E> first;
//隊尾結點
transient Node<E> last;
//佇列中的結點計數
private transient int count;
//佇列容量
private final int capacity;
//重入鎖
final ReentrantLock lock = new ReentrantLock();
//佇列允許出隊條件
private final Condition notEmpty = lock.newCondition();
//佇列允許入隊條件
private final Condition notFull = lock.newCondition();
//使用預設容量的佇列
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
//指定容量的佇列
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
//帶有初始元素的佇列
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try {
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
lock.unlock();
}
}
}
4.入隊過程
由對BlockingDeque的分析可知,LinkedBlockingDueue中存在著大量的入隊方法,這裡就不一一分析了,因為實現基本都差不多,只挑選個別來看看。
//addFirst方法的本質其實還是呼叫offerFirst
//向隊首新增元素,若佇列容量不足,則拋異常
public void addFirst(E e) {
if (!offerFirst(e))
throw new IllegalStateException("Deque full");
}
//向隊首新增元素,若佇列容量不足,則返回false
public boolean offerFirst(E e) {
//佇列中不允許null元素存在
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e); //新建對應結點
final ReentrantLock lock = this.lock;
lock.lock(); //加鎖
try {
return linkFirst(node); //真正執行新增隊首結點的方法
} finally {
lock.unlock();
}
}
//向隊首新增結點
private boolean linkFirst(Node<E> node) {
//判斷佇列是否已滿
if (count >= capacity)
return false; //佇列已滿直接返回失敗
Node<E> f = first; //獲取隊首結點
node.next = f; //將原隊首結點設為新增結點的後繼結點
first = node; //將新增結點設為隊首結點
//判斷原佇列中是否為空佇列
if (last == null)
last = node; //原佇列若為空佇列,那麼此時隊首隊尾都是同一個結點
else
f.prev = node; //設定原來的隊首結點的前驅結點為新增結點
++count; //佇列中的結點數量+1
notEmpty.signal(); //喚醒執行出隊操作的執行緒
return true;
}
//向隊尾新增元素,若佇列容量不足,則拋異常
public void addLast(E e) {
if (!offerLast(e))
throw new IllegalStateException("Deque full");
}
//向隊尾新增元素,若佇列容量不足,則返回false
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkLast(node); //真正實現新增隊尾結點的方法
} finally {
lock.unlock();
}
}
//向隊尾新增結點
private boolean linkLast(Node<E> node) {
//判斷佇列是否已滿
if (count >= capacity)
return false;
Node<E> l = last; //獲取當前隊尾結點
node.prev = l; //將新增結點的前驅設為l
last = node; //新增節點設為隊尾
//判斷佇列原本是否為空
//若為空,則新增結點既是隊首也是隊尾
if (first == null)
first = node;
else
l.next = node; //將l節點的後繼設為新增結點
++count; //計數+1
notEmpty.signal(); //喚醒執行出隊操作的執行緒
return true;
}
//向隊首新增結點,若佇列已滿,則等待
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node)) //若新增隊首結點失敗,則執行緒進入等待狀態
notFull.await();
} finally {
lock.unlock();
}
}
//向隊尾新增結點,若佇列已滿,則等待
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node)) //若新增隊尾結點失敗,則執行緒進入等待狀態
notFull.await();
} finally {
lock.unlock();
}
}
5.出隊過程
同入隊一樣,出隊的方法也很多,這裡也只選個別來分析:
//獲取並移除隊首元素,若佇列為空,則返回null
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst(); //將隊首結點從佇列中移除
} finally {
lock.unlock();
}
}
//移除隊首節點的方法
private E unlinkFirst() {
//獲取隊首結點
Node<E> f = first;
//判斷佇列是否為空佇列,空佇列直接返回null
if (f == null)
return null;
//獲取隊首結點的後繼結點,要作為新的隊首結點
Node<E> n = f.next;
E item = f.item; //獲取隊首節點的資料,用作返回值
f.item = null; //清空隊首結點,方便GC回收
f.next = f; //隊首出隊的後繼設為自己
first = n; //設定新的隊首為n
//判斷佇列是否還有結點
if (n == null)
last = null; //佇列若是空了,那麼隊尾也設為null
else
n.prev = null; //新隊首的前驅設為null
--count; //佇列中的結點計數-1
//喚醒執行入隊操作的執行緒,佇列剛執行一次出隊操作,必然有剩餘空間
//因此可以執行入隊操作
notFull.signal();
return item;
}
//獲取並移除隊尾元素,若佇列為空,則返回null
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast(); //將隊尾結點從佇列中移除
} finally {
lock.unlock();
}
}
//移除隊尾結點的方法
private E unlinkLast() {
//獲取隊尾引用
Node<E> l = last;
//判斷佇列是否是空佇列,空佇列直接返回null
if (l == null)
return null;
Node<E> p = l.prev; //獲取隊尾的前驅結點,用作新的隊尾結點
E item = l.item;
l.item = null;
l.prev = l; //隊尾結點出隊後的前驅設為自身,方便GC回收
last = p; //設定新隊尾
//判斷隊尾出隊後佇列中是否還是有結點,即佇列是否成了空佇列
if (p == null)
first = null; //空佇列的隊首也是null
else
p.next = null; //新隊尾的後繼設為null
--count; //結點計數-1
notFull.signal(); //喚醒入隊操作的執行緒
return item;
}
//將隊首移除並返回,若佇列已空則等待
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
//判斷移除隊首結點是否成功,失敗則等待
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
//將隊尾移除並返回,若佇列已空則等待
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
//判斷移除隊尾結點是否成功,失敗則等待
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
6.總結
LinkedBlockingDueue中其他的方法就不一一分析了,都比較簡單。
與LinkedBlockingQueue對比,LinkedBlockingDueue的執行緒安全以及阻塞等待的實現基本沒有區別,兩個阻塞佇列基本可以通用(LinkedBlockingDueue用作棧時除外)。兩個佇列基本上只有兩點不同:一個是底層資料結構的細微區別,LinkedBlockingQueue是單向連結串列,而LinkedBlockingDueue則是雙向連結串列;另一個是重入鎖的使用有些區別,LinkedBlockingDueue不論出入隊都使用的是同一個鎖物件,而LinkedBlockingQueue的出入隊鎖