死磕java concurrent包系列(五)基於AQS的條件佇列把LinkedBlockingQueue“扒光”
LinkedBlockingQueue的基礎
LinkedBlockingQueue是一個基於連結串列的阻塞佇列,實際使用上與ArrayBlockingQueue完全一樣,我們只需要把之前烤雞的例子中的Queue物件替換一下即可。如果對於ArrayBlockingQueue不熟悉,可以去看看https://juejin.im/post/5c0f79f3f265da61561f1bec
LinkedBlockingQueue原始碼分析
原始碼在node上註釋寫明瞭,它是基於一個“two lock queue”演算法實現的,感興趣的同學可以參考這篇paper:www.cs.rochester.edu/u/scott/pap…
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
/**
* Linked list node class
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
複製程式碼
每個新增到LinkedBlockingQueue佇列中的資料都將被封裝成Node節點(這個node不同於AQS中的node,它是一個單向連結串列),其中head和last分別指向佇列的頭結點和尾結點。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對併發進行控制,也就是說,新增和刪除操作並不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。這裡再次強調如果沒有給LinkedBlockingQueue指定容量大小,其預設值將是Integer.MAX_VALUE,如果存在新增速度大於刪除速度時候,有可能會記憶體溢位,這點在使用前希望慎重考慮。至於LinkedBlockingQueue的實現原理圖與ArrayBlockingQueue是類似的,除了對新增和移除方法使用單獨的鎖控制外,兩者都使用了不同的Condition條件物件作為等待佇列,用於掛起take執行緒和put執行緒。 總結如下圖:
LinkedBlockingQueue的阻塞新增
同樣的,新增的方法主要有:add offer 和put。我們先看看非阻塞新增的add和offer方法,這兩個方法的區別同樣是新增失敗時,add方法是拋異常,offer方法是返回false
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
複製程式碼
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//因為存在併發操作移出和入隊互不衝突,與arrayBlockingQueue不同,count被宣告為Atomic
final AtomicInteger count = this.count;
//佇列滿了直接返回
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//因為存在併發問題,加鎖之後再次判斷一下佇列有沒有滿
if (count.get() < capacity) {
//入隊
enqueue(node);
//容量+1返回舊值
c = count.getAndIncrement();
//因為在入隊時可能同時有出隊的執行緒同時把元素移除,所以在入隊後做一個補償,
//如果佇列還有空間,那麼喚醒一個如歸的執行緒執行新增操作
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
//c==0,只有可能最開始就是一個空佇列(注意上面的c返回的是舊值)此時因為剛好添加了一個元素,
//所以喚醒消費的執行緒去取移出元素
if (c == 0)
signalNotEmpty();
return c >= 0;
}
複製程式碼
//入隊操作
private void enqueue(Node<E> node) {
//佇列尾節點指向新的node節點
last = last.next = node;
}
//signalNotEmpty方法去喚醒移出元素的執行緒,為什麼要先獲取鎖才能signal呢?不懂的同學回去看看AQS:
//因為條件佇列是基於AQS的鎖存在的,用法上必須要這麼用,否則會丟擲異常
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
//喚醒獲取並刪除元素的執行緒
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
複製程式碼
這裡的Offer()方法做了兩件事:
- 第一件事是判斷佇列是否滿,滿了就直接釋放鎖,沒滿就將節點封裝成Node入隊,然後加鎖後再次判斷佇列新增完成後是否已滿,不滿就繼續喚醒等到在條件物件notFull上的新增執行緒。
- 第二件事是,判斷是否需要喚醒等到在notEmpty條件物件上的消費執行緒。
接下來看看put方法,與offer方法如出一轍:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//鎖可被中斷
putLock.lockInterruptibly();
try {
//佇列滿時加入notFull條件佇列
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
//佇列還沒有滿時,繼續喚醒新增執行緒
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//c==0,只有可能最開始就是一個空佇列(注意上面的c返回的是舊值)此時因為剛好添加了一個元素,
//所以喚醒消費的執行緒去取移出元素
if (c == 0)
signalNotEmpty();
}
複製程式碼
這裡有幾個問題:
問題1:
為什麼新增完成後是繼續喚醒在條件佇列notFull上的新增執行緒而不是像ArrayBlockingQueue那樣直接喚醒notEmpty條件物件上的消費執行緒?
分析1: 先回想一下ArrayBlockingQueue:它內部只有一個鎖,在內部完成新增元素操作後直接喚醒消費執行緒去消費。如果ArrayBlockingQueue在新增元素之後再喚醒新增執行緒的話,消費的執行緒就可能一直被block,無法執行。 而為了避免這種情況,對於LinkedBlockingQueue來說,他有兩個鎖,新增和刪除元素不是互斥的,新增的過程中可能已經刪除好幾個元素了,所以他在設計上要儘可能的去喚醒兩個條件佇列。 新增執行緒在佇列沒有滿時自己直接喚醒自己的其他新增執行緒,如果沒有等待的新增執行緒,直接結束了。如果有就直到佇列元素已滿才結束掛起。注意消費執行緒的執行過程也是如此。這也是為什麼LinkedBlockingQueue的吞吐量要相對大些的原因。
問題2: 為什麼if (c == 0)時才去喚醒消費執行緒呢
分析2: 什麼情況下c等於0呢?c值是新增元素前佇列的大小,也就是說,之前是空佇列,空佇列時會有什麼情況呢,空佇列會阻塞所有的take程序,將其封裝到notEmpty的條件佇列中。這個時候,c之前是0,現在在執行了enqueue方法後,佇列中有元素了,所以他需要立即喚醒阻塞的take程序,否則阻塞的take程序就一直block在佇列裡,一直沉睡下去。 為什麼c>0時,就不會喚醒呢?因為take方法和put方法一樣,take方法每次take完元素後,如果佇列還有值,它會繼續喚醒take佇列,也就是說他只要沒有被await()阻塞,他就會一直不斷的喚醒take執行緒,而不需要再新增的時候再去喚醒,造成不必要的效能浪費
LinkedBlockingQueue的阻塞移出
相對的,我們再看看take方法:
public E take() throws InterruptedException {
E x;
int c = -1;
//獲取當前佇列大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可中斷
try {
//如果佇列沒有資料,當前take執行緒到條件佇列中
while (count.get() == 0) {
notEmpty.await();
}
//如果存在資料直接刪除並返回該資料
x = dequeue();
c = count.getAndDecrement();//佇列大小減1,返回之前的值
if (c > 1)
notEmpty.signal();//還有資料就喚醒後續的消費執行緒
} finally {
takeLock.unlock();
}
//滿足條件(之前佇列是滿的,現在剛剛執行dequeue拿出了一個),
//喚醒條件物件上等待佇列中的新增執行緒
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;//獲取頭結點
Node<E> first = h.next; //獲取頭結的下一個節點(要刪除的節點)
h.next = h; // help GC//自己next指向自己,即被刪除
head = first;//更新頭結點
E x = first.item;//獲取刪除節點的值
first.item = null;//清空資料,因為first變成頭結點是不能帶資料的,這樣也就刪除佇列的帶資料的第一個節點
return x;
}
複製程式碼
take方法是一個可阻塞可中斷的移除方法,主要做了兩件事:
- 如果佇列沒有資料就掛起當前執行緒到 notEmpty條件物件的等待佇列中一直等待,如果有資料就刪除節點並返回資料項,同時喚醒後續消費執行緒;
- 嘗試喚醒條件物件notFull上等待佇列中的新增執行緒:假設之前佇列中滿員了,那麼新來的put程序將會被阻塞進notFull條件佇列,然後await掛起沉睡。這個時候有執行緒通過take方法拿出了一個元素,如果此時不喚醒notFull條件佇列,那麼之前滿員時佇列中的執行緒就會一直睡死過去
總結
LinkedBlockingQueue的兩個佇列:
-
notFull條件佇列(佇列滿時阻塞的put執行緒): await的時機:佇列滿了 signal的時機:一是put方法放入元素後,如果佇列還有空位,會singal執行緒繼續新增;二是如果佇列最開始滿員,take方法移出了一個元素後,佇列還有一個空位時也會喚醒它。
-
notEmpty條件佇列(佇列空時候阻塞的take執行緒): await的時機:佇列空了 signal的時機:一是take方法移出元素後,如果佇列還有空位,會singal執行緒繼續移出;二是如果佇列最開始空的,put方法放入了一個元素後,佇列還有一個元素時也會喚醒它。
這種演算法就是“two lock queue”的設計思想,這也是LinkedBlockingQueue的吞吐量較高的本質原因
ArrayBlockingQueue和LinkedBlockingQueue的比較總結
通過上述的分析,對於LinkedBlockingQueue和ArrayBlockingQueue的基本使用以及內部實現原理我們已較為熟悉了,這裡我們就對它們兩間的區別來個小結
1.佇列大小和構造方法有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對於後者而言,當新增速度大於移除速度時,在無界的情況下,可能會造成記憶體溢位等問題,有坑。
2.資料儲存容器不同,ArrayBlockingQueue採用的是陣列作為資料儲存容器,而LinkedBlockingQueue採用的則是以Node節點作為連線物件的單向連結串列。
3.從GC的角度分析:由於ArrayBlockingQueue採用的是陣列的儲存容器,因此在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而LinkedBlockingQueue則會生成一個額外的Node物件。這可能在長時間內需要高效併發地處理大批量資料的時,對於GC可能存在較大影響。
4.兩者的實現佇列新增或移除的鎖不一樣,ArrayBlockingQueue實現的佇列中的鎖是沒有分離的,即新增操作和移除操作採用的同一個ReenterLock鎖,而LinkedBlockingQueue實現的佇列中的鎖是分離的,其新增採用的是putLock,移除採用的則是takeLock,這樣能大大提高佇列的吞吐量,也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。