1. 程式人生 > >ArrayBlockingQueue 與 LinkedBlockingQueue

ArrayBlockingQueue 與 LinkedBlockingQueue

  • ArrayBlockingQueue內部的阻塞佇列是通過重入鎖ReenterLockCondition條件佇列實現的,
    • 所以ArrayBlockingQueue中的元素存在公平訪問與非公平訪問的區別,
    • 對於公平訪問佇列,被阻塞的執行緒可以按照阻塞的先後順序訪問佇列,
      • 即先阻塞的執行緒先訪問佇列。
    • 而非公平佇列,當佇列可用時,阻塞的執行緒將進入爭奪訪問資源的競爭中,
      • 也就是說誰先搶到誰就執行,沒有固定的先後順序。
    • //預設非公平阻塞佇列
      ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
      //公平阻塞佇列
      ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 儲存資料的陣列 */
    final Object[] items;

    /**獲取資料的索引,主要用於take,poll,peek,remove方法 */
    int takeIndex;

    /**新增資料的索引,主要用於 put, offer, or add 方法*/
    int putIndex;

    /** 佇列元素的個數 */
    int count;


    /** 控制並非訪問的鎖 */
    final ReentrantLock lock;

    /**notEmpty條件物件,用於通知take方法佇列已有元素,可執行獲取操作 */
    private final Condition notEmpty;

    /**notFull條件物件,用於通知put方法佇列未滿,可執行新增操作 */
    private final Condition notFull;

    /**
       迭代器
     */
    transient Itrs itrs = null;

}
  • ArrayBlockingQueue內部確實是通過陣列物件items來儲存所有的資料
    • 一個ReentrantLock來同時控制新增執行緒移除執行緒的併發訪問,
      • 這點與LinkedBlockingQueue區別很大(稍後會分析)
    • notEmpty條件物件則是用於存放等待喚醒呼叫take方法的執行緒,
      • 告訴他們佇列已有元素,可以執行獲取操作
    • notFull條件物件是用於等待或喚醒呼叫put方法的執行緒,
      • 告訴它們,佇列未滿,可以執行新增元素的操作
    • 只要putIndex與takeIndex不相等就說明佇列沒有結束

LinkedBlockingQueue

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * 節點類,用於儲存資料
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** 阻塞佇列的大小,預設為Integer.MAX_VALUE */
    private final int capacity;

    /** 當前阻塞佇列中的元素個數 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 阻塞佇列的頭結點
     */
    transient Node<E> head;

    /**
     * 阻塞佇列的尾節點
     */
    private transient Node<E> last;

    /** 獲取並移除元素時使用的鎖,如take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** notEmpty條件物件,當佇列沒有資料時用於掛起執行刪除的執行緒 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 新增元素時使用的鎖如 put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** notFull條件物件,當佇列資料已滿時用於掛起執行新增的執行緒 */
    private final Condition notFull = putLock.newCondition();

}
  • 由連結串列實現的有界佇列阻塞佇列,但大小預設值為Integer.MAX_VALUE
    • 所以我們在使用LinkedBlockingQueue時建議手動傳值,
    • 為其提供我們所需的大小,
    • 避免佇列過大造成機器負載或者記憶體爆滿等情況。
      • 如果存在新增速度大於刪除速度時候,
      • 有可能會記憶體溢位,這點在使用前希望慎重考慮
  • 在正常情況下,連結佇列的吞吐量要高於基於陣列的佇列(ArrayBlockingQueue)
    • 因為其內部實現新增刪除操作使用的兩個ReenterLock來控制併發執行,
  • 內部維持一個基於連結串列的資料佇列
    • 每個新增到LinkedBlockingQueue佇列中的資料都將被封裝成Node節點,
    • 新增的連結串列佇列中,其中head和last分別指向佇列的頭結點和尾結點
    • 內部分別使用了takeLock 和 putLock 對併發進行控制,
      • 也就是說,新增和刪除操作並不是互斥操作,
      • 可以同時進行,這樣也就可以大大提高吞吐量