1. 程式人生 > >JDK併發工具類原始碼學習系列——LinkedBlockingQueue

JDK併發工具類原始碼學習系列——LinkedBlockingQueue

LinkedBlockingQueue是一個基於已連結節點的、範圍任意的 blocking queue。此佇列按 FIFO(先進先出)排序元素。佇列的頭部 是在佇列中時間最長的元素。佇列的尾部 是在佇列中時間最短的元素。新元素插入到佇列的尾部,並且佇列獲取操作會獲得位於佇列頭部的元素。連結佇列的吞吐量通常要高於基於陣列的佇列,但是在大多數併發應用程式中,其可預知的效能要低。

使用場景

LinkedBlockingQueue常用於生產者/消費者模式中,作為生產者和消費者的通訊橋樑。LinkedBlockingQueue與之前介紹的ConcurrentLinkedQueue以及PriorityBlockingQueue功能類似,都是Queue的一種,不同之處是:

  • LinkedBlockingQueue和PriorityBlockingQueue是阻塞的,而ConcurrentLinkedQueue是非阻塞的,
  • 同時LinkedBlockingQueue和PriorityBlockingQueue通過加鎖實現執行緒安全,而ConcurrentLinkedQueue使用CAS實現無鎖模式
  • PriorityBlockingQueue支援優先順序

由於不同的特徵,所以以上三者的使用場景也不同:

  • LinkedBlockingQueue適合需要阻塞的佇列場景,如果能不阻塞或者可以通過程式碼自行實現阻塞,那麼建議使用ConcurrentLinkedQueue代替
  • ConcurrentLinkedQueue適合對效能要求較高,同時無需阻塞的場景使用
  • PriorityBlockingQueue適合需要根據任務的不同優先順序進行調整佇列的順序的場景
結構預覽

LinkedBlockingQueue內部實現相對較簡單,直接使用一個連結串列儲存資料,通過加鎖實現執行緒安全,通過兩個Condition分別實現入隊和出隊的等待。連結串列的節點使用內部類:Node表示,Node很簡單,就兩個變數,由外部類直接修改即可。

/**
 * Linked list node class
 */
static class Node<E> {
    /** The item, volatile to ensure barrier separating write and read */
volatile E item; Node<E> next; Node(E x) { item = x; } }

item使用volatile修飾,解決記憶體可見性。

常用方法解析

LinkedBlockingQueue常用方法有:入隊(offer(E)/offer(E, long, TimeUnit)/put(E))、出隊(poll()/poll(long, TimeUnit)/take())、刪除(remove(Object))。下面分別看看這三類方法。

入隊
/**
 * @By Vicky:入隊,無阻塞,佇列未滿則直接入隊,否則直接返回false
 */
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;// 儲存當前佇列的長度
    // 這裡因為count是Atomic的,所以有類似volatile的記憶體可見性效果
    // 即對count的修改能夠立即被其他執行緒可見,所以此處不加鎖的情況下讀取count值是會讀取到最新值的
    // 然後根據此值進行前置判斷,避免不必要的加鎖操作
    if (count.get() == capacity)// 佇列已滿直接返回false
        return false;
    int c = -1;
    final ReentrantLock putLock = this.putLock;// 獲取putLock,加鎖
    putLock.lock();
    try {
        if (count.get() < capacity) {// 佇列未滿則插入
            insert(e);
            c = count.getAndIncrement();// 更新count值
            if (c + 1 < capacity)// 未滿則喚醒等待在notFull上的執行緒
                // 此處有點怪異,入隊喚醒notFull~
                // 此處喚醒notFull是考慮有可能如果多個執行緒同時出隊,由於出隊喚醒notFull時也需要對putLock進行加鎖
                // 所以有可能一個執行緒出隊,喚醒notFull,但是被另一個出隊執行緒搶到了鎖,所以入隊執行緒依舊在等待
                // 當另一個執行緒也喚醒了notFull,釋放了putLock後,只能喚醒一個入隊執行緒,所以其他執行緒依舊在等待
                // 所以此處需要再次喚醒notFull
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    // c==0表示佇列在插入之前是空的,所以需要喚醒等待在notEmpty上的執行緒
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

/**
 * @By Vicky:喚醒notEmpty,需對takeLock進行加鎖,因為notEmpty與takeLock相關
 */
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

首先解析offer(),另外兩個入隊操作只是在佇列已滿的情況下進行一些特殊處理而已。文中程式碼給出了詳細註釋,這裡著重說明兩個地方:

  • 對Condition的操作需要在加鎖的環境下進行,而且是需要對與Condition相關的鎖進行加鎖,如此處notEmpty是由takeLock.newCondition()得來,所以對notEmpty的操作需要對takeLock進行加鎖
  • 入隊操作也執行notFull.signal();的原因是避免入隊執行緒未搶到鎖而遺失了出隊的喚醒操作。詳細解析可以見文中的註釋

下面直接貼出offer(E, long, TimeUnit)和put(E)的程式碼,基本同offer(E)。

/**
 * @By Vicky:入隊,等待指定時間
 */
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        for (;;) {
            // 此處同offer()
            if (count.get() < capacity) {
                insert(e);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
                break;
            }
            // nanos是剩餘的等待時間,<=0表示等待時間已到
            if (nanos <= 0)
                return false;
            try {
                // 呼叫notFull的awaitNanos,指定等待時間,如果等待期間被喚醒,則返回剩餘等待時間,<0表示等待時間已到
                nanos = notFull.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

/**
 * @By Vicky:入隊,無期限等待
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset
    // local var holding count  negative to indicate failure unless set.
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        try {
            while (count.get() == capacity)// 無限等待,直到可用
                notFull.await();
        } catch (InterruptedException ie) {
            notFull.signal(); // propagate to a non-interrupted thread
            throw ie;
        }
        insert(e);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
出隊

出隊操作和入隊邏輯相同,看程式碼。

/**
 * @By Vicky:出隊,無阻塞,佇列為空則直接返回null
 */
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)// 佇列為空,直接返回
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {// 不為空,獲取一個元素
            x = extract();
            c = count.getAndDecrement();
            if (c > 1)// 同offer(),此處需喚醒notEmpty
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();// 同offer(),此處需喚醒notFull
    return x;
}

/**
 * @By Vicky:出隊,將head指向head.next
 * @return
 */
private E extract() {
    Node<E> first = head.next;
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

/**
 * @By Vicky:喚醒notFull,需對putLock進行加鎖,因為notFull與putLock相關
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

出隊一個元素:extract(),邏輯很簡單,將head指向head.next即可。其他地方與offer()的邏輯相同,如佇列未空需喚醒notEmpty,佇列由滿變空需喚醒notFull,原因完全同offer()。poll(long, TimeUnit)和take()程式碼就不貼出來了,完全與offer()相同。

刪除
/**
 * @By Vicky:刪除指定元素
 */
public boolean remove(Object o) {
    if (o == null) return false;
    boolean removed = false;
    fullyLock();// 同時對takeLock和pullLock加鎖,避免任何的入隊和出隊操作
    try {
        Node<E> trail = head;
        Node<E> p = head.next;
        while (p != null) {// 從佇列的head開始迴圈查詢與o相同的元素
            if (o.equals(p.item)) {// 找到相同的元素則設定remove為true
                removed = true;
                break;
            }
            trail = p;// 繼續迴圈
            p = p.next;
        }
        if (removed) {
            // remove==true,則表示查詢到待刪除元素,即p,將trail的next指向p的next,即將p從佇列移除及完成刪除
            p.item = null;
            trail.next = p.next;
            if (last == p)
                last = trail;
            if (count.getAndDecrement() == capacity)
                notFull.signalAll();
        }
    } finally {
        fullyUnlock();
    }
    return removed;
}

刪除的邏輯也很簡單,程式碼中給出了註釋。

以上即本篇全部內容,比較簡單,更多關於佇列的研究可參考:

以上內容如有錯誤,請不吝賜教~