Java併發學習(二十三)-LinkedBlockingQueue和LinkedBlockingDeque分析
有兩個比較相似的併發阻塞佇列,LinkedBlockingQueue和LinkedBlockingDeque,兩個都是佇列,只不過前者只能一端出一端入,後者則可以兩端同時出入,並且都是結構改變執行緒安全的佇列。其實兩個佇列從實現思想上比較容易理解,有以下特點:
- 連結串列結構(動態陣列)
- 通過ReentrantLock實現鎖
- 利用Condition實現佇列的阻塞等待,喚醒
以下將分開講述LinkedBlockingQueue和LinkedBlockingDeque的基本特點及操作。
LinkedBlockingQueue
這是一個只能一端出一端如的單向佇列結構,是有FIFO特性的,並且是通過兩個ReentrantLock和兩個Condition來實現的。先看它的結構基本欄位:
/**
* 基於連結串列。
* FIFO
* 單向
*最大容量是Integer.MAX_VALUE.
*/
public class LinkedBlockingQueueAnalysis<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/*
* 兩個方向。
* putLock
* takeLock
* 有些操作會需要同時獲取兩把鎖。
* 例如remove操作,也需要獲取兩把鎖
*/
//主要的node節點
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//容量,一開始就固定了的。
private final int capacity;
//用AtomicInteger 來記錄數量。
private final AtomicInteger count = new AtomicInteger();
//head節點 head.item == null
transient Node<E> head;
//last節點,last.next == null
private transient Node<E> last;
//take鎖
private final ReentrantLock takeLock = new ReentrantLock();
//等待take的節點序列。
private final Condition notEmpty = takeLock.newCondition();
//put的lock。
private final ReentrantLock putLock = new ReentrantLock();
//等待puts的佇列。
private final Condition notFull = putLock.newCondition();
...
}
和LinkedBlockingDeque的區別之一就是,LinkedBlockingQueue採用了兩把鎖來對佇列進行操作,也就是隊尾新增的時候,
隊頭仍然可以刪除等操作。接下來看典型的操作。
put操作
首先看put操作:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException(); //e不能為null
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock; //獲取put鎖
final AtomicInteger count = this.count; //獲取count
putLock.lockInterruptibly();
try {
while (count.get() == capacity) { //如果滿了,那麼就需要使用notFull阻塞
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity) //如果此時又有空間了,那麼notFull喚醒
notFull.signal();
} finally {
putLock.unlock(); //釋放鎖
}
if (c == 0) //當c為0時候,也要根take鎖說一下,併發下
signalNotEmpty(); //呼叫notEmpty
}
主要的思想還是比較容易理解的,現在看看enqueue
方法:
private void enqueue(Node<E> node) { //入對操作。
last = last.next = node; //隊尾進
}
再看看signalNotEmpty
方法:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); //加鎖
try {
notEmpty.signal(); //用於signal,notEmpty
} finally {
takeLock.unlock();
}
}
take操作
take操作,就是從佇列裡面彈出一個元素,下面看它的詳細程式碼:
public E take() throws InterruptedException {
E x;
int c = -1; //設定一個記錄變數
final AtomicInteger count = this.count; //獲得count
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); //加鎖
try {
while (count.get() == 0) { //如果沒有元素,那麼就阻塞性等待
notEmpty.await();
}
x = dequeue(); //一定可以拿到。
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); //報告還有元素,喚醒佇列
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); //解鎖
return x;
}
接下來看dequeue
方法:
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC 指向自己,幫助gc回收
head = first;
E x = first.item; //從隊頭出。
first.item = null; //將head.item設為null。
return x;
}
對於LinkedBlockingQueue來說,有兩個ReentrantLock分別控制隊頭和隊尾,這樣就可以使得新增操作分開來做,一般的操作是獲取一把鎖就可以,但有些操作例如remove操作,則需要同時獲取兩把鎖:
public boolean remove(Object o) {
if (o == null) return false;
fullyLock(); //獲取鎖
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) { //依次迴圈遍歷
if (o.equals(p.item)) { //找到了
unlink(p, trail); //解除連結
return true;
}
}
return false; //沒找到,或者解除失敗
} finally {
fullyUnlock();
}
}
當然,除了上述的remove方法外,在Iterator的next方法,remove方法以及LBQSpliterator分割迭代器中也是需要加全鎖進行操作的。
LinkedBlockingDeque
名字很相近,LinkedBlockingDeque就是一個雙端佇列,任何一端都可以進行元素的出入,接下來看它的主要欄位:
/**
* 雙端佇列。
* 最大值是Integer.MAX_VALUE
* 所謂弱一致性有利於刪除,有點理解了,
* 或許是比如clear方法,不知直接把引用置為null,而是一個個解除連線。
* 利用lock鎖去控制併發訪問,利用condition去控制阻塞
* weakly consistent的iterators。
* 我們需要保持所有的node都要是gc可達的。
*/
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
//雙向聯結的節點。
static final class Node<E> {
E item; //泛型的item變數
// 前一個節點
Node<E> prev;
//next後一個節點
Node<E> next;
Node(E x) {
item = x;
}
}
//頭節點
transient Node<E> first;
//尾節點。
transient Node<E> last;
//count,表示數值。
private transient int count;
//容量
private final int capacity;
//實現控制訪問的鎖
final ReentrantLock lock = new ReentrantLock();
//take的Condition
private final Condition notEmpty = lock.newCondition();
//put的Condition
private final Condition notFull = lock.newCondition();
...
}
從上面的結果來看,其實LinkedBlockingDeque的結構上來說,有點像ArrayBlockingQueue的構造,也是一個ReentrantLock和兩個Condition,下面分別對其中重要方法進行分析。
- public void addFirst(E e)
- public void addLast(E e)
- public boolean offerFirst(E e)
- public boolean offerLast(E e)
- …
對於LinkedBlockingDeque,和ArrayBlockingQueue結構還是很類似的,也是一個ReentrantLock和兩個Condition使用,但是僅僅是在這二者使用上,其實內部運轉還是很大不同的。
offerFirst操作
offerFirst
就是在隊頭新增一個元素:
public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock; //加鎖
lock.lock();
try {
return linkFirst(node);
} finally {
lock.unlock();
}
}
接下來看linkFirst
方法:
private boolean linkFirst(Node<E> node) {
if (count >= capacity) //容量滿了
return false;
Node<E> f = first; //在隊頭新增
node.next = f;
first = node;
if (last == null) //第一個節點
last = node;
else
f.prev = node;
++count; //count自增
notEmpty.signal(); //說明不為null。喚醒等待佇列
return true;
}
其他的方法類似,都是加鎖後對連結串列的操作,這裡就不贅述了。
clear操作
其實我一開始看clear操作時候,總以為它是直接把first和last分別置為null就行了,非常簡單,但實際上,它的實現方法卻是遍歷以便,分別把所有node指標都指向null從而方便gc。
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock(); //加鎖後清空所有。
try {
for (Node<E> f = first; f != null; ) { //遍歷一遍
f.item = null; //置空操作
Node<E> n = f.next;
f.prev = null;
f.next = null;
f = n; //f後移動一個
}
first = last = null;
count = 0;
notFull.signalAll(); //通知等待put執行緒
} finally {
lock.unlock();
}
}
這樣的思路很值得學習借鑑。
總結
總的來說,這兩個阻塞佇列實現上還是比較容易理解的,具體細節方面還是很值得閱讀的。