BlockingQueue的雙鎖源碼解析
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
其中capacity是隊列的長度,構造方法很簡單,初始化node,並設置隊列的最大容量capacity。
2.核心屬性
/** The capacity bound, or Integer.MAX_VALUE if none */ //隊列長度,不指定默認是Integer.MAX_VALUE private final int capacity; /** Current number of elements */ //當前隊列的元素個數,用AtomicInteger 保證同步 private final AtomicInteger count = new AtomicInteger(); /** Lock held by take, poll, etc */ //take鎖,類型為ReentrantLock private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ //take的條件 private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ //put鎖,類型為ReentrantLock private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ //put的條件 private final Condition notFull = putLock.newCondition();
3.入隊put
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
第一步判斷入隊元素的合法性,第二步新建一個元素Node,然後請求鎖並拿到當前隊列的元素總數count,這些都比較容易理解,我們重點關註下try裏的邏輯
1.先判斷count等於隊列最大長度capacity,此時用notFull阻塞等待,為什麽這裏不用if判斷而是while呢?因為當阻塞被喚醒後,if會直接執行enqueue(node);操作,而在執行增加操作前可能又被其它線程拿到鎖添加滿了,所以必須再次判斷才可以保證正確性。
2.完成入隊後,判斷c + 1 < capacity,然後隨機喚醒一個notFull,這裏為什麽是喚醒一個消費者線程而不是喚醒全部呢?原因是有可能在隊列滿的時候假如共有5個生產線程,那麽5個都會阻塞,這時消費者同時消費了多個元素,但是可能只發出了1個喚醒生產者的信號,這時候醒著的put線程就會通過這種方式來喚醒其它的4個put線程,以彌補take線程的信號不足。相比於signalAll()喚醒所有生產者,這種解決方案使得同一時間最多只有一個生產者在清醒的競爭鎖,性能提升非常明顯。這裏入隊邏輯基本完成,出隊邏輯是和入隊對應的。
4.特殊情況
我們知道LinkedBlockingQueue是通過兩把鎖一把是put鎖,一把是take鎖,但是這樣有一個特殊情況當隊列長度為1時,到底入隊和出隊之間會存在鎖競爭嗎?
我們來看它是怎麽做的
//初始化
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//入隊操作
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
//出隊操作
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
1.初始化時,定義了一個dummy節點,這個和lock、countdownlatch實現一樣,都有一個哨兵節點,head和tail都指向這個哨兵。
2.在隊尾入隊時,tail節點後移,並指向這個第一個入隊的元素,此時head還是指向dummy。
3.出隊時,創建一個Node h指向head也就是dummy,然後first指向head的next節點,然後把first的值賦值x,消除first,返回x。
總的來說就是互換head和head.next的值,最終把x返回.
BlockingQueue的雙鎖源碼解析