1. 程式人生 > >ArrayBlockingQueue源碼閱讀(1.8)

ArrayBlockingQueue源碼閱讀(1.8)

mov mutual capacity 超時時間 puts 默認 查看 all rac

ArrayBlockingQueue源碼閱讀

1、ArrayBlockingQueue類結構

??public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable。ArrayBlockingQueue是BlockingQueue接口的一種實現,要了解它就必須清楚BlockingQueue的相關知識;

2、BlockingQueue接口介紹

??在並發隊列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高性能隊列,一個是以BlockingQueue接口為代表的阻塞隊列,無論哪種都繼承自Queue接口!,BlockingQueue的類繼承關系如下:

技術分享圖片

BlockingQueue接口重要方法如下:


  • offer(anObject): 表示如果可能的話, 將anObject加到BlockingQueue裏,即如果BlockingQueue可以容納, 則返回true, 否則返回false.(本方法不阻塞當前執行方法的線程)。
  • offer(E o, long timeout, TimeUnit unit), 可以設定等待的時間,如果在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。
  • put(anObject): 把anObject加到BlockingQueue裏, 如果BlockQueue沒有空間, 則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續。
  • poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗,如果不指定超時時間,在沒有數據時立即返回失敗。
  • take(): 取走BlockingQueue裏排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入。
  • drainTo(): 一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。

3、源碼分析

3.1、類屬性查看

/** The queued items */ 以數組作為數據結構
final Object[] items;

/** items index for next take, poll, peek or remove */ 隊列中下一個將被取出值的下標
int takeIndex;

/** items index for next put, offer, or add */  隊列中下一個將被放入值的下標
int putIndex;

/** Number of elements in the queue */ 數組元素數量
int count;

/*
 * Concurrency control uses the classic two-condition algorithm  使用雙條件算法
 * found in any textbook. 
 */

/** Main lock guarding all access */ 使用重入鎖(獨占鎖)
final ReentrantLock lock;
/** Condition for waiting takes */ take時候用於等待的條件
private final Condition notEmpty;
/** Condition for waiting puts */ put時候用於等待的條件
private final Condition notFull;

3.2、構造函數分析

/**

  • Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • capacity and default access policy.
  • @param capacity the capacity of this queue
  • @throws IllegalArgumentException if {@code capacity < 1}
    */

        public ArrayBlockingQueue(int capacity) {
        this(capacity, false);  //調用public ArrayBlockingQueue(int capacity, boolean fair)構造方法,默認使用非公平鎖
    } 

    /**

  • Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • capacity and the specified access policy.
  • @param capacity the capacity of this queue
  • @param fair if {@code true} then queue accesses for threads blocked
  • on insertion or removal, are processed in FIFO order; //如果傳入的值為true即公平鎖,則需要維護一個有序隊列,保證先進先出的原則
  • if {@code false} the access order is unspecified.
  • @throws IllegalArgumentException if {@code capacity < 1}
    */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity]; //創建指定容量的數組
        lock = new ReentrantLock(fair); //默認使用非公平鎖
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /**

  • Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • capacity, the specified access policy and initially containing the
  • elements of the given collection,
  • added in traversal order of the collection‘s iterator.
  • @param capacity the capacity of this queue
  • @param fair if {@code true} then queue accesses for threads blocked
  • on insertion or removal, are processed in FIFO order;
  • if {@code false} the access order is unspecified.
  • @param c the collection of elements to initially contain 使用指定集合初始化隊列
  • @throws IllegalArgumentException if {@code capacity} is less than
  • {@code c.size()}, or less than 1.
  • @throws NullPointerException if the specified collection or any
  • of its elements are null
    */
    //這個構造函數的核心就是c.size()與capacity的大小關系對比了
    //如果c.size()>capacity那就會報錯,所以在初始化的時候要註意
    public ArrayBlockingQueue(int capacity, boolean fair,
    Collection<? extends E> c) {
    this(capacity, fair); //先創建指定容量的數組,以便集合中的元素存放
    //這種寫法我們很常見,使用final表示引用不能改變,但又避免了直接使用成員變量
    final ReentrantLock lock = this.lock;
    //對隊列直接修改操作,需要先獲取獨占鎖
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
    int i = 0;
    try {
    for (E e : c) {
    checkNotNull(e);
    items[i++] = e; //下標從0開始存放
    }
    } catch (ArrayIndexOutOfBoundsException ex) {
    throw new IllegalArgumentException();
    }
    count = i; //將數組元素個數返回給全局變量
    putIndex = (i == capacity) ? 0 : i; //修改下一次將被放入值的下標
    } finally {
    lock.unlock(); //解鎖
    }
    }

ArrayBlockingQueue源碼閱讀(1.8)