【Java併發程式設計】LinkedBlockingQueue的使用(六)
阿新 • • 發佈:2019-02-16
一、LinkedBlockingQueue
1.1 簡介
LinkedBlockingQueue是一個由連結串列結構組成的有界阻塞佇列,此佇列是FIFO(先進先出)的順序來訪問的,它由隊尾插入後再從隊頭取出或移除,其中佇列的頭部是在佇列中時間最長的元素,佇列的尾部是在佇列中時間最短的元素。在LinkedBlockingQueue類中分別用2個不同的鎖takeLock、putLock來保護隊頭和隊尾操作。如下圖所示:
1.2 類圖
1.3 原始碼分析
1.3.1 屬性與連結串列節點類
//連結串列節點類,next指向下一個節點。如果下一個節點時null表示沒有節點了。 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } // 最大容量上限,預設是 Integer.MAX_VALUE private final int capacity; // 當前元素數量,這是個原子類。 private final AtomicInteger count = new AtomicInteger(0); // 頭結點 private transient Node<E> head; // 尾結點 private transient Node<E> last; // 隊頭訪問鎖 private final ReentrantLock takeLock = new ReentrantLock(); // 隊頭訪問等待條件、佇列 private final Condition notEmpty = takeLock.newCondition(); // 隊尾訪問鎖 private final ReentrantLock putLock = new ReentrantLock(); // 隊尾訪問等待條件、佇列 private final Condition notFull = putLock.newCondition();
使用原子類AtomicInteger是因為讀寫分別使用了不同的鎖,但都會訪問這個屬性來計算佇列中元素的數量,所以它需要是執行緒安全的。關
1.3.2 offer操作
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //當佇列滿時,直接返回了false,沒有被阻塞等待元素插入 if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node(e); //開啟隊尾保護鎖 final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); //原則計數類 c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { //釋放鎖 putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; } //在持有鎖下指向下一個節點 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
1.3.3 put操作
//put 操作把指定元素新增到隊尾,如果沒有空間則一直等待。 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. //註釋:在所有的 put/take/etc等操作中變數c為負數表示失敗,>=0表示成功。 int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ /* * 注意,count用於等待監視,即使它沒有用鎖保護。這個可行是因為 * count 只能在此刻(持有putLock)減小(其他put執行緒都被鎖拒之門外), * 當count對capacity發生變化時,當前執行緒(或其他put等待執行緒)將被通知。 * 在其他等待監視的使用中也類似。 */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); // 還有可新增空間則喚醒put等待執行緒。 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
1.3.4 take操作
//彈出隊頭元素,如果沒有會被阻塞直到元素返回
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();//沒有元素一直阻塞
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)//如果還有可獲取元素,喚醒等待獲取的執行緒。
notEmpty.signal();
} finally {
//拿到元素後釋放鎖
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//在持有鎖下返回佇列隊頭第一個節點
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
//出隊後的節點作為頭節點並將元素置空
head = first;
E x = first.item;
first.item = null;
return x;
}
1.3.5 remove操作
//移除指定元素。
public boolean remove(Object o) {
if (o == null) return false;
//對兩把鎖加鎖
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
//p是移除元素所在節點,trail是移除元素的上一個節點
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
//將trail下一個節點指向p的下一個節點
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
//釋放鎖時確保和加鎖順序一致
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
注意,鎖的釋放順序與加鎖順序是相反的。
參考資料
http://ifeve.com/juc-linkedblockingqueue/