BlockingQueue的雙鎖原始碼解析
1.構造方法
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
其中capacity是佇列的長度,構造方法很簡單,初始化node,並設定佇列的最大容量capacity。
2.核心屬性
/** The capacity bound, or Integer.MAX_VALUE if none */ //佇列長度,不指定預設是Integer.MAX_VALUE private final int capacity; /** Current number of elements */ //當前佇列的元素個數,用AtomicInteger 保證同步 private final AtomicInteger count = new AtomicInteger(); /** Lock held by take, poll, etc */ //take鎖,型別為ReentrantLock private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ //take的條件 private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ //put鎖,型別為ReentrantLock private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ //put的條件 private final Condition notFull = putLock.newCondition();
3.入隊put
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
第一步判斷入隊元素的合法性,第二步新建一個元素Node,然後請求鎖並拿到當前佇列的元素總數count,這些都比較容易理解,我們重點關注下try裡的邏輯
1.先判斷count等於佇列最大長度capacity,此時用notFull阻塞等待,為什麼這裡不用if判斷而是while呢?因為當阻塞被喚醒後,if會直接執行enqueue(node);操作,而在執行增加操作前可能又被其它執行緒拿到鎖新增滿了,所以必須再次判斷才可以保證正確性。
2.完成入隊後,判斷c + 1 < capacity,然後隨機喚醒一個notFull,這裡為什麼是喚醒一個消費者執行緒而不是喚醒全部呢?原因是有可能在佇列滿的時候假如共有5個生產執行緒,那麼5個都會阻塞,這時消費者同時消費了多個元素,但是可能只發出了1個喚醒生產者的訊號,這時候醒著的put執行緒就會通過這種方式來喚醒其它的4個put執行緒,以彌補take執行緒的訊號不足。相比於signalAll()喚醒所有生產者,這種解決方案使得同一時間最多隻有一個生產者在清醒的競爭鎖,效能提升非常明顯。
這裡入隊邏輯基本完成,出隊邏輯是和入隊對應的。
4.特殊情況
我們知道LinkedBlockingQueue是通過兩把鎖一把是put鎖,一把是take鎖,但是這樣有一個特殊情況當佇列長度為1時,到底入隊和出隊之間會存在鎖競爭嗎?
我們來看它是怎麼做的
//初始化
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//入隊操作
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
//出隊操作
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
1.初始化時,定義了一個dummy節點,這個和lock、countdownlatch實現一樣,都有一個哨兵節點,head和tail都指向這個哨兵。
2.在隊尾入隊時,tail節點後移,並指向這個第一個入隊的元素,此時head還是指向dummy。
3.出隊時,建立一個Node h指向head也就是dummy,然後first指向head的next節點,然後把first的值賦值x,消除first,返回x。
總的來說就是互換head和head.next的值,最終把x返回.