1. 程式人生 > >Java阻塞佇列詳解

Java阻塞佇列詳解

更新日誌

日期 更新內容 備註
2017-11-03 新增轉載標誌 持續更新

阻塞佇列

阻塞佇列是一種佇列,一種可以在多執行緒環境下使用,並且支援阻塞等待的佇列。也就是說,阻塞佇列和一般的佇列的區別就在於:

  1. 多執行緒環境支援,多個執行緒可以安全的訪問佇列
  2. 支援生產和消費等待,多個執行緒之間互相配合,當佇列為空的時候,消費執行緒會阻塞等待佇列不為空;當佇列滿了的時候,生產線 程就會阻塞直到佇列不滿。

Java提供了豐富的阻塞佇列,下面的類圖展示了java提供的阻塞佇列:

java阻塞佇列

阻塞佇列在java中的一種典型使用場景是執行緒池,線上程池中,當提交的任務不能被立即得到執行的時候,執行緒池就會將提交的任務放到一個阻塞的任務佇列中來,比如下面的程式碼:


    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newFixedThreadPool使用可LinkedBlockingQueue這種阻塞佇列。


    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newCachedThreadPool使用了SynchronousQueue這種佇列,這種佇列的特點是不快取資料,而是快取執行緒,執行緒分為生產者執行緒和消費者執行緒,一個生產者執行緒和一個消費者執行緒是互補的,當一個生產者執行緒遇到一個消費者執行緒的時候就會直接進行資料交換,所以這種佇列的技術點比較高,理解起來難度較大。一個執行緒只能快取一個數據,當一個執行緒插入資料之後就會被阻塞,直到另外一個執行緒消費了其中的資料。

阻塞佇列還提供了其他型別的佇列,包括雙端阻塞佇列,延時阻塞佇列,延時阻塞佇列的使用可以在newScheduledThreadPool中找到,newScheduledThreadPool裡面使用延時阻塞佇列來排程週期性任務執行。

下面展示的是BlockingQueue提供的一些方法:

BlockingQueue方法

根據插入和取出兩種型別的操作,具體分為下面一些型別:

操作型別 Throws Exception Special Value Blocked Timed out
插入 add(o) offer(o) put(o) offer(o, timeout, unit)
取出(刪除) remove(o) poll() take() poll(timeout, unit)
  • Throws Exception 型別的插入和取出在不能立即被執行的時候就會丟擲異常。
  • Special Value 型別的插入和取出在不能被立即執行的情況下會返回一個特殊的值(true 或者 false)
  • Blocked 型別的插入和取出操作在不能被立即執行的時候會阻塞執行緒直到可以操作的時候會被其他執行緒喚醒
  • Timed out 型別的插入和取出操作在不能立即執行的時候會被阻塞一定的時候,如果在指定的時間內沒有被執行,那麼會返回一個特殊值

本文將對java的阻塞佇列進行一些分析,分為下面幾個內容進行組織:

  • 介紹最為基本和最為簡單的ArrayBlockingQueue和LinkedBlockingQueue,也是最常用的
  • LinkedBlockingDeque,雙端阻塞佇列
  • DelayQueue延時佇列,這是一個非常有趣的阻塞佇列,你同樣可以參考DelayedWorkQueue的實現
  • PriorityBlockingQueue優先阻塞佇列,很明顯這在需要多執行緒支援、需要優先順序佇列支援的場景下會被運用
  • 然後是SynchronousQueue同步佇列,這是最為複雜的佇列(個人認為),本文將知識嘗試分析,更為具體的、更為深入的分析應該閱讀原始碼,並且閱讀原始碼中提到的一些paper

ArrayBlockingQueue和LinkedBlockingQueue

ArrayBlockingQueue和LinkedBlockingQueue是最為常用的阻塞佇列,前者使用一個有邊界的陣列來作為儲存介質,而後者使用了一個沒有邊界的連結串列來儲存資料。下面分別分析一下他們的實現細節:

ArrayBlockingQueue

ArrayBlockingQueue需要你提供陣列的大小,下面是ArrayBlockingQueue提供的三個建構函式:


public ArrayBlockingQueue(int capacity): // 初始化陣列大小

public ArrayBlockingQueue(int capacity, boolean fair): //初始化陣列大小,並且設定是否是fire模式
 
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) //初始化陣列大小,設定是否是fair模式,然後使用一個集合初始化阻塞佇列


在建構函式中有兩個比較關鍵的引數,一個是capacity代表阻塞佇列使用的陣列的長度,另外一個是fair,代表阻塞佇列的一種策略選擇,用於構造用於執行緒同步的鎖(ReentrantLock),關於ReentrantLock的細節不在本文的敘述範圍,關於java鎖的詳細內容將在其他的篇章中單獨學習分析。

下面展示了ArrayBlockingQueue的一些關鍵的成員變數:


    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;


關於成員變數已經在註釋裡面解釋了,ArrayBlockingQueue使用了ReentrantLock來做同步,使用兩個Condition來做插入同步和獲取同步。下面是兩個重要的方法,一個用於將一個元素插入佇列中去,一個用於從佇列中獲取一個元素,並且從佇列中刪除:


    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

首先,將變數放在合適的位置,然後更新索引。最為重要的是 notEmpty.signal()這句話會喚醒任意一個等待在notEmpty這個條件變數上的執行緒,關於signal()這個方法的描述,可以見下面的描述:


     * Wakes up one waiting thread.
     *
     * <p>If any threads are waiting on this condition then one
     * is selected for waking up. That thread must then re-acquire the
     * lock before returning from {@code await}.

那為什麼需要這樣做呢?notEmpty這個條件變數用於表示佇列是否有資料,插入資料勢必會讓佇列不為空,而在插入資料之前,可能會有執行緒已經嘗試來獲取資料了,那麼就會等待在這個條件變數上面,那麼當插入資料之後,需要喚醒這些執行緒,為了減少不必要的麻煩,這個條件變數在插入一個數據之後僅僅喚醒一個等待在這個條件變數上的執行緒。

還有一點需要注意,這個陣列的使用配合了兩個遊標變數:takeIndex和putIndex,配合這兩個變數之後陣列的使用就像是一個環形佇列一樣了。注意,可能有人擔心會有一種情況,佇列滿了之後沒有消費執行緒,再次插入第一個佇列的元素會被覆蓋嗎?這就多慮了,具體我們看下面的程式碼:


    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }


上面的程式碼展示了put操作的細節,可以很明顯的看到,當陣列中的元素數量達到設定的容量之後就會在notFull這個條件變數上等待,而不會再次呼叫enqueue這個方法來插入,所以不會出現上面的那種情況。


       private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

上面的程式碼展示了獲取一個佇列元素的方法細節。依然需要關注的是notFull.signal(),這句話的意思是:喚醒一個等待在notFull這個條件變數上的執行緒。具體的語義是什麼呢?就是,有可能有執行緒在進行插入操作的時候會發現佇列被用完了,那麼就會阻塞到notFull這個條件變數上,當某個執行緒獲取了一個元素之後,佇列就有空閒的空間可以插入了,那麼就可以喚醒一個等待在這個條件變數上的執行緒了,具體就是喚醒一個等待插入的執行緒開始活動。
下面具體分析一下幾個重要的方法:

  • put(o)

put方法的內容上文中已經提到過,在此不再羅列,它是一個阻塞方法,在操作不能立刻得到執行的時候會阻塞等待。具體的就是,如果發現佇列使用的陣列沒有可用的容量了,那麼就等待在一個條件變數上,而這個條件變數需要在有空閒空間的時候喚醒等待在他上面的執行緒。

  • offer(e)

該方法在插入操作不能立即執行的時候就會返回false,否則會返回true代表插入成功了。具體的細節見下面的程式碼:


  public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    

當然在多執行緒環境下,插入操作需要鎖住,也就是靠鎖來達到同步執行緒的功能。

  • offer(e, timeout, unit)

和offer(e)一樣在操作不能執行的時候就會返回特殊值,不同的是會等待一段時間,然後再返回。

  • take()

take操作會在獲取元素失敗的時候阻塞直達有執行緒喚醒它。下面是具體的細節:


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

  • poll()

poll和offer類似,只是poll從佇列中獲取資料,而offer插入資料。

  • poll(timeout, unit)

和offer(o, timeout, unit) 類似

  • peek()

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

peek操作會取得佇列頭的內容,但是不會將其從佇列中刪除,下次peek還是同樣的內容。

  • remove(e)方法

佇列可以插入,可以獲取,當然還可以刪除,remove(e)方法只會刪除第一個匹配的元素,remove(e)方法藉助removeAt(index)方法來刪除一個元素,在刪除成功的時候會返回true,否則會返回false。下面具體分析一下removeAt(index)這個方法:


    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }

刪除的思路就是:和刪除陣列中的元素一樣,需要移動陣列,所以這個操作是比較耗時的,在刪除一個元素完成後,有可能有執行緒等待在插入元素的條件變數上,而現在有空閒的空間可以插入元素了,所以需要喚醒一個等待的執行緒讓他插入元素。

關於ArrayBlockingQueue的內容就分析到這裡,這是一個最為簡單的阻塞佇列實現,對ArrayBlockingQueue的分析也較為細節,下面分析LinkedBlockingQueue的時候有的內容就會一筆帶過,因為LinkedBlockingQueue和ArrayBlockingQueue事一樣的,只是ArrayBlockingQueue使用陣列做佇列,而LinkedBlockingQueue使用連結串列做佇列。如何選擇哪種阻塞佇列和如何選擇陣列和連結串列這兩種資料結構的思路是一樣的,對於頻繁進行佇列元素獲取操作的場景下,首選ArrayBlockingQueue,而在需要頻繁進行佇列元素刪除、新增的場景下首選LinkedBlockingQueue。

LinkedBlockingQueue

LinkedBlockingQueue使用連結串列來作為佇列的資料結構,下面就是連結串列節點的資料結構,可以發現是非常簡單的:

    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

下面展示了LinkedBlockingQueue的關鍵成員變數:


/** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

需要注意的是,LinkedBlockingQueue的插入操作和獲取資料的操作使用了不同的鎖,文件解釋是使用了“two lock queue”演算法,具體的細節可以參考:two lock queue Algorithm

下面展示了兩個操作條件變數的util方法:


    /**
     * Signals a waiting take. Called only from put/offer (which do not
     * otherwise ordinarily lock takeLock.)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

signalNotEmpty方法首先獲取takeLock,然後對等待在notEmpty這個條件變數上的某個執行緒進行喚醒操作。而signalNotFull則首先獲取putLock,然後對等待在notFull這個條件變數上的某個執行緒進行喚醒操作。和ArrayBlockingQueue一樣,LinkedBlockingQueue也提供了兩個插入佇列和從佇列獲取元素的方法:


    /**
     * Links node at end of queue.
     *
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }


相比於ArrayBlockingQueue更為簡單了,只是一些連結串列操作,新的元素將被放到連結串列的尾部,而獲取元素的操作將從連結串列首部獲取節點。

下面的程式碼展示了put操作的細節:


    /**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

如果佇列容量已經用完了,那麼該執行緒就會阻塞在notFull這個條件變數上等待喚醒。如果佇列以前為空,此次插入式第一個元素,那麼有可能在插入元素之前已經有執行緒試圖獲取元素,那麼這個試圖獲取元素的執行緒就會阻塞住,所以需要告訴它們中的一個,現在有資料可以獲取了,可以醒來消費資料了。

下面的程式碼展示了take方法的細節:


    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

和put方法相反,當佇列裡面沒有資料的時候就會阻塞等待,獲取元素成功之後會喚醒那些希望插入資料而被阻塞的執行緒(喚醒一個執行緒)。

最後分析一下remove(o)方法,下面是remove(o)的原始碼:


    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

刪除操作會同時將讀和寫鎖都獲取,然後再操作。同樣只會刪除第一個匹配的元素。刪除成功會返回true,否則返回false。需要注意的是,當刪除了一個元素只會,需要通知哪些等待插入而沒有空閒空間而被阻塞的執行緒,告訴他們中的一個可以插入資料了。

LinkedBlockingDeque

LinkedBlockingDeque的實現依靠了雙向連結串列,所以如果想要學習雙向連結串列的話可以去看原始碼,主要是對雙向連結串列的操作,在讀和寫上加上了鎖,以及一些條件變數的操作而已,介於篇幅,本文不再敘述該部分內容,未來會基於LinkedBlockingDeque寫一篇關於java中的連結串列操作的文章。

PriorityBlockingQueue

PriorityBlockingQueue是一個優先阻塞佇列。所謂優先佇列,就是每次從隊佇列裡面獲取到的都是佇列中優先順序最高的,對於優先順序,PriorityBlockingQueue需要你為插入其中的元素型別提供一個Comparator,PriorityBlockingQueue使用這個Comparator來確定元素之間的優先順序關係。我們應該知道有一種資料結構叫做堆,堆是一種二叉樹,這個二叉樹的特點是,任意的節點都比它的兒子節點大或者小,所以我們可以猜測,PriorityBlockingQueue中入隊和出隊的操作就是一系列操作堆這種資料結構的細節。
介於篇幅限制,本小節只分析PriorityBlockingQueue的入隊操作,關於其他的比如出隊、刪除等操作均可參照入隊操作。再者,關於堆這種資料結構的描述,不再本文中詳細敘述,將和LinkedBlockingDeque中提到的連結串列一併在新的文章中分析總結。

下面是put(o)操作的呼叫鏈路:


put(o) -> offer(e) -> tryGrow -> siftUpComparable| siftUpUsingComparator


下面是offer(o)方法的程式碼:


    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }


當有限佇列的容量不足時會進行擴容操作,當然使用陣列擴容的老套路就是:獲取一個新的容量足夠大的陣列,然後將原來的陣列內容複製到新的陣列中去,然後釋放原先的老的陣列。陣列容量增長的策略如下:


  int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : (oldCap >> 1));

可以發現,如果原來的容量小於64,那麼每次grow只會增長(2 + old),否則增長原來的一半。

佇列容量增長完成之後,offer操作得以繼續,會檢視是否設定了Comparator,如果沒有顯示設定,那麼就會試圖去使用容器中元素的Comparator,否則使用設定好的Comparator,下面基於已經設定好Comparator的前提下分析。



    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

這很明顯就是一個將元素壓入堆這種資料結構的演算法啊,x為將要壓入的資料,k為佇列尾,array為目標陣列,cmp為比較器。為了說明將一個元素壓入堆中的具體過程,下面舉一個栗子:


假設我們維持一個由基本型別int霸佔的優先佇列,我們希望每次獲取到的都是最小的元素,並且假設現在堆中已經有了
幾個資料:1,2,4,5,7,8,那堆的結構應該類似於:
 
            1
      |           |
      2           4
 |        |   |       |    
 7        8   5       Nil

現在我們想要插入一個數據3,則具體過程為:

1、將3放在最後位置
2、將3和自己的父親節點比較,如果小於父親節點,那麼互動
3、繼續執行步驟2直到父親節點的值小於自己,或者已經交換到根節點了,則壓入完成

根據上面的三步,現在分析一下插入3之後的動作。首先,3和父親節點4比較,3小於4成立,則3和4交換,然後3繼續和
自己的父親節點1做比較,1小於3,已經找到合適的位置了,不需要再比較交換了,則壓入3成功,最後的堆的結構應該類似於:

            1
      |           |
      2           3
 |        |   |       |    
 7        8   5       4

從堆中獲取一個數據的分析和插入類似,不再贅述。


現在,我們來看一下siftUpUsingComparator這個方法,首先將x放在最末尾,然後和自己的父親節點比較,如果滿足Comparator條件則結束,否則繼續向上比較。插入元素需要向上比較並且交換,而獲取一個元素則需要先將末尾的元素放在根節點,然後向下比較並且互動。當然,插入一個元素之後,需要將等待獲取元素而被阻塞的執行緒喚醒一個來消費資料。

DelayQueue

DelayQueue是一個延時佇列,所謂延時佇列就是消費執行緒將會延時一段時間來消費元素。介於篇幅限制,本小節只分析take()方法的實現細節,畢竟延時佇列的特點就是在消費的時候,下面展示了take()方法的實現細節:


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }


需要注意的是,在延時佇列中儲存的物件需要是Delayed的子類,下面是該類的細節:

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

只有一個方法,就是要獲取消費延時。

繼續看take方法的細節。延時阻塞佇列使用了優先阻塞佇列來儲存資料,資料的獲取是有優先順序的,這一點需要注意,在這點上,我們應該和java的執行緒池的排程執行緒池的實現聯絡起來,在java的排程執行緒池的實現上,也使用了延時佇列,而優先順序佇列可以保證執行緒池排程的任務都是根據時間優先順序被排程的。take方法首先從優先佇列中獲取第一個元素,然後詢問是否需要延時,如果不需要,則直接返回,否則延時設定的時間之後再返回。

更多關於延時佇列的內容,可以參考DelayQueue的原始碼,以及參考java排程執行緒池實現時使用的DelayedWorkQueue。

SynchronousQueue

最後分析的SynchronousQueue是最為複雜的阻塞佇列。SynchronousQueue和前面分析的阻塞佇列都不同,因為SynchronousQueue不存在容量的說法,任何插入操作都需要等待其他執行緒來消費,否則就會阻塞等待,也就是說,生產執行緒生產出一條資料之後就要等待消費者執行緒來將其消費掉,才能繼續生產資料,否則就會阻塞等待消費。佇列中會把到來的執行緒快取起來,當然會進行一些操作,下面是大概的演算法:


1、佇列初始化為null
2、當一個執行緒達到之後,如果佇列為null,則將該執行緒放到佇列中去,否則,判斷佇列中的第一個元素是否和當前到達的元素
匹配,如果匹配,那麼兩個執行緒的資料交易完成,否則也將新到達的執行緒資料快取到佇列中。


SynchronousQueue有一個fair-mode引數,fair模式和non-fair模式。關於這兩種模式的區別,可以參考下面的文件說明:


     * The (Lifo) stack is used for non-fair mode, and the (Fifo)
     * queue for fair mode. The performance of the two is generally
     * similar. Fifo usually supports higher throughput under
     * contention but Lifo maintains higher thread locality in common
     * applications.

SynchronousQueue通過使用Transferer類的transfer(E e, boolean timed, long nanos)方法來完成資料交易操作,根據fair模式和non-fair模式有兩種型別的Transferer,fair模式對應於TransferQueue,non-fair模式對應TransferStack。

TransferQueue

首先分析fair模式下的操作。TransferQueue使用一個連結串列儲存資料,下面展示了使用的連結串列節點的資料結構:


            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

next指向下一個連結串列節點,item是該節點上的資料內容,waiter是等待在該節點上的執行緒,isData代表這是一個生產執行緒節點還是一個消費執行緒節點,如果是生產執行緒節點,那麼isData就為true,消費執行緒節點就為false。下面來分析一下最為主要的方法:transfer(E e, boolean timed, long nanos)

對於該方法的執行原理,可以參考下面的描述:


             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.

總結一下,就是:

  1. 如果當前queue是null的話,或者佇列頭和當前執行緒持有相同的mode(讀或者寫),那麼將當前元素插入佇列中
  2. 如果當前操作和佇列頭的操作互補(也就是讀-寫,寫-讀),那麼試圖去交易資料

具體的程式碼看下面:


        E transfer(E e, boolean timed, long nanos) {
            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos); 
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }


可以看到,該方法的實現上是較為複雜的,下面我們根據上面提到的演算法來分析一下這個方法的程式碼。

首先判斷該呼叫是生成執行緒還是消費執行緒,也就是獲得isData的值。然後和佇列中的元素進行匹配,如果匹配成功,則交易資料,否則將此次操作的資訊加入到佇列中去,然後等待匹配。awaitFulfill方法負責等到匹配完成。

下面來分析一下一個具體的操作的執行流程,比如put操作。下面是put方法的程式碼:


    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

可以看到put方法使用了transfer這個方法,而put方法屬於寫方法,所以第一個引數不為null,也就是isData為true。它需要和一個isData為false的操作相匹配。

關於更為具體和深入的內容,請查閱更多的資料以及去閱讀原始碼,本文對該內容的分析到底為止。

TransferStack

關於TransferStack的分析,改日再談!當然對於SynchronousQueue的分析也需要另外的篇幅,而本文為了文章內容的完整性提到了SynchronousQueue,但是SynchronousQueue過於複雜,首先是能力限制,目前無法深入分析SynchronousQueue的實現,再者篇幅限制,本文因為需要提供了大量的程式碼,篇幅過長影響閱讀體驗,很多內容點到為止,需要用其他的篇章來詳細分析,這些內容包括:

  • java中的連結串列操作
  • java中的優先佇列(堆)
  • java中的鎖
  • 以及本小節內容(SynchronousQueue)

這些內容要麼較為重要,要麼較為複雜,需要重點關注!本文的重點在於java中的阻塞佇列,文章描述了最為基本的阻塞佇列ArrayBlockingQueue和LinkedBlockingQueue,以及PriorityBlockingQueue,然後是DelayQueue,最後提到了SynchronousQueue,當然還有很多需要補充的內容,以後會做補充!

轉載於:https://www.jianshu.com/p/4028efdbfc35