1. 程式人生 > >BlockingQueue的雙鎖原始碼解析

BlockingQueue的雙鎖原始碼解析

1.構造方法

public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

其中capacity是佇列的長度,構造方法很簡單,初始化node,並設定佇列的最大容量capacity。
2.核心屬性

/** The capacity bound, or Integer.MAX_VALUE if none */
//佇列長度,不指定預設是Integer.MAX_VALUE
private final int capacity;
/** Current number of elements */
//當前佇列的元素個數,用AtomicInteger 保證同步
private final AtomicInteger count = new AtomicInteger();
/** Lock held by take, poll, etc */
//take鎖,型別為ReentrantLock
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
//take的條件
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
//put鎖,型別為ReentrantLock
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
//put的條件
private final Condition notFull = putLock.newCondition();

3.入隊put

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

第一步判斷入隊元素的合法性,第二步新建一個元素Node,然後請求鎖並拿到當前佇列的元素總數count,這些都比較容易理解,我們重點關注下try裡的邏輯

1.先判斷count等於佇列最大長度capacity,此時用notFull阻塞等待,為什麼這裡不用if判斷而是while呢?因為當阻塞被喚醒後,if會直接執行enqueue(node);操作,而在執行增加操作前可能又被其它執行緒拿到鎖新增滿了,所以必須再次判斷才可以保證正確性。
2.完成入隊後,判斷c + 1 < capacity,然後隨機喚醒一個notFull,這裡為什麼是喚醒一個消費者執行緒而不是喚醒全部呢?原因是有可能在佇列滿的時候假如共有5個生產執行緒,那麼5個都會阻塞,這時消費者同時消費了多個元素,但是可能只發出了1個喚醒生產者的訊號,這時候醒著的put執行緒就會通過這種方式來喚醒其它的4個put執行緒,以彌補take執行緒的訊號不足。相比於signalAll()喚醒所有生產者,這種解決方案使得同一時間最多隻有一個生產者在清醒的競爭鎖,效能提升非常明顯。
這裡入隊邏輯基本完成,出隊邏輯是和入隊對應的。

4.特殊情況
我們知道LinkedBlockingQueue是通過兩把鎖一把是put鎖,一把是take鎖,但是這樣有一個特殊情況當佇列長度為1時,到底入隊和出隊之間會存在鎖競爭嗎?
我們來看它是怎麼做的

//初始化
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
//入隊操作
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
//出隊操作
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

1.初始化時,定義了一個dummy節點,這個和lock、countdownlatch實現一樣,都有一個哨兵節點,head和tail都指向這個哨兵。
2.在隊尾入隊時,tail節點後移,並指向這個第一個入隊的元素,此時head還是指向dummy。
3.出隊時,建立一個Node h指向head也就是dummy,然後first指向head的next節點,然後把first的值賦值x,消除first,返回x。

總的來說就是互換head和head.next的值,最終把x返回.