1. 程式人生 > >併發容器學習—ArrayBlockingQueue

併發容器學習—ArrayBlockingQueue

一、ArrayBlockingQueue併發容器

1.ArrayBlockingQueue的繼承體系

    見名知義,ArrayBlockingQueue是個由陣列支援的有界阻塞佇列。也遵循先進先出(FIFO)的原則,也就是說ArrayBlockingQueue的容量是有限的,並不能像ArrayList那樣自動擴容。ArrayBlockingQueue的繼承關係如下圖所示:

    其中除了BlockingQueue介面未接觸過,其餘的類和介面都已經分析過,不在贅言。

public interface BlockingQueue<E> extends Queue<E> {
    //將指定的元素新增到隊尾,若佇列容量已滿,則拋異常
    boolean add(E e);

    //將指定的元素新增到隊尾,若佇列容量已滿,那麼放棄新增,返回false
    boolean offer(E e);

    //將指定的元素新增到隊尾,若佇列已滿,那麼等待佇列有容量時在新增
    //可被中斷
    void put(E e) throws InterruptedException;

    //在一定時間內嘗試將指定元素新增到隊尾,若超過指定時間仍未新增成功,
    //則放棄新增,返回false
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //若佇列不為空,則獲取並移除隊首,若佇列為空,則等待佇列有元素可用
    //可被中斷
    E take() throws InterruptedException;

    //在一定時間內嘗試獲取並移除隊首元素,若超過指定時間仍無可用元素,
    //則返回null
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //返回佇列的剩餘容量,不包含阻塞中的元素
    int remainingCapacity();

    //刪除佇列中的元素o
    boolean remove(Object o);

    //判斷佇列中是否含有元素o
    public boolean contains(Object o);

    //將佇列中的所有元素轉移到容器c中,佇列中不留任何元素
    int drainTo(Collection<? super E> c);

    //將佇列中最多maxElements個元素刪除並轉移到容器c中
    int drainTo(Collection<? super E> c, int maxElements);
}

2.重要的屬性及構造方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    //底層資料儲存的陣列
    final Object[] items;

    //隊首元素在陣列中的索引
    int takeIndex;

    //隊尾元素在陣列中的索引
    int putIndex;

    //佇列中元素的個數
    int count;
    
    //重入鎖,用於同步操作
    final ReentrantLock lock;

    //條件,允許出隊的條件
    private final Condition notEmpty;

    //允許入隊的條件
    private final Condition notFull;

    //迭代器
    transient Itrs itrs = null;

    //建立一個容量為capacity的預設佇列(不公平的佇列)
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    //建立一個容量為capacity的佇列,是否公平由fair決定
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];    //陣列大小
        lock = new ReentrantLock(fair);    //公平或不公平鎖
        notEmpty = lock.newCondition();    //出隊條件
        notFull =  lock.newCondition();    //入隊條件
    }

    //建立一個容量為capacity的佇列,是否公平由fair決定,且佇列中
    //包含有容器c中的元素
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        final ReentrantLock lock = this.lock;
        lock.lock();     //加鎖
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);    //元素不能為null
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }
}

3.入隊過程

    在ArrayBlockingQueue中入隊的方法有三個:add、offer和put,都是將元素新增到隊尾,我們逐個來看:

//新增元素到隊尾,若佇列沒有剩餘容量,則放棄新增
public boolean offer(E e) {
    //入隊元素不能為null,這裡說明ArrayBlockingQueue中不允許有null元素存在
    checkNotNull(e);   
    final ReentrantLock lock = this.lock;
    lock.lock();    //加鎖
    try {
        
        //判斷佇列中的元素個數是否超過佇列容量
        //若佇列沒有剩餘容量存放元素,那麼就放棄新增
        if (count == items.length)
            return false;
        else {
            enqueue(e);    //入隊方法
            return true;
        }
    } finally {
        lock.unlock();    //解鎖
    }
}

/**
* 入隊方法
* ArrayBlockingQueue的入隊和出隊操作是典型的生產者消費者模式的使用
* ArrayBlockingQueue也是個典型生產者消費者佇列,當佇列中沒有元素(產品)
* 時,會掛起或拒絕所有的出隊(消費)操作;當佇列中的元素滿了時,會掛起或拒
* 絕所有入隊(生產)操作。而當佇列中有元素時,則會喚醒或接受所有出隊操作
* 同理,佇列中還有空間時,則會喚醒或接受所有入隊操作
*/
private void enqueue(E x) {
    //獲取底層陣列引用
    final Object[] items = this.items;
    items[putIndex] = x;    //將元素新增到隊尾

    //判斷隊尾索引是否超過陣列的長度
    /**
    * ArrayBlockingQueue中底層陣列採用環形陣列來
    * 實現陣列的重複利用,putIndex永遠指向隊尾元素的索引
    * 因此當putIndex的值等於陣列長度時,說明已經到達陣列的
    * 最大索引,需要回到0的索引位置了
    */
    if (++putIndex == items.length)
        putIndex = 0;
    count++;    //元素個數+1
    notEmpty.signal();    //喚醒執行出隊操作的執行緒
}

//新增元素到隊尾,若佇列沒有剩餘容量,則丟擲異常
public boolean add(E e) {
    return super.add(e);    //父類方法
}

//父類中的add方法
public boolean add(E e) {
    //執行子類的offer新增方法判斷新增是否成功
    if (offer(e))    
        return true;
    else
        //新增失敗丟擲異常
        throw new IllegalStateException("Queue full");
}

//在一定時間內嘗試將指定元素新增到隊尾,若超過指定時間仍未新增成功,
//則放棄新增,返回false
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);    //不允許新增null
    long nanos = unit.toNanos(timeout);    //時間換算
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();    //可被打斷的加鎖
    try {
        //迴圈判斷佇列是否已滿
        /**
        * 迴圈判斷是為防止‘虛假喚醒’,即有多個入隊操作執行緒被喚醒
        * 但若此時只有一個剩餘容量,那麼這多個入隊操作只有一個應該被執行
        * 其他的入隊操作仍應該繼續等待,如果使用if來進行一次性的判斷
        * 那麼就會造成在只有一個一個剩餘容量的情況下,執行多次入隊操作
        * 這是不允許的,因此應該使用while來繼續判斷佇列是否已滿,防止出現
        * 虛假喚醒。
        */
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);    //等待nanos時間
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();    //解鎖
    }
}

//將指定的元素新增到隊尾,若佇列已滿,那麼等待佇列有容量時在新增
//可被中斷
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();    //可被中斷的加鎖
    try {

        //判斷佇列是否已滿,若是佇列已滿,
        //那麼執行入隊操作的當前執行緒進入條件佇列(條件
        //佇列指的是重入鎖底層AQS同步器中的執行緒條件佇列,
        //不是當前的元素佇列)中等待,只有佇列有剩餘空間時
        //才會被喚醒嘗試入隊,這裡也存在虛假喚醒的可能,
        //因此也需要使用while對喚醒的執行緒進行在一次的判斷
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

4.出隊過程

    在ArrayBlockingQueue中的出隊方法有poll和take方。

//將隊首元素移除出隊並返回,若佇列為空,則返回null
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

//真正執行出隊操作的方法
private E dequeue() {

    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];    //獲取隊尾元素
    items[takeIndex] = null;    //將隊尾元素從陣列中移除

    //判斷隊尾元素的索引takeIndex是否超過陣列的邊界
    //超過陣列索引最大值,則回到陣列的索引起點0
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;    //佇列中元素-1

    //判斷是否使用了佇列的迭代器功能
    if (itrs != null)
        itrs.elementDequeued();    //整理迭代器

    //執行了一個出隊操作,佇列必不可能滿,那麼可以喚醒一個入隊執行緒
    notFull.signal();    
    return x;
}

//內部類Itrs中的整理迭代器結點的方法
void elementDequeued() {
    // assert lock.getHoldCount() == 1;
    if (count == 0)    //佇列中沒有元素時,需要清空所有迭代器
        queueIsEmpty();
    else if (takeIndex == 0)    
        //隊尾索引指向索引0時需要將陣列的迴圈次數+1
        //然後對所有的迭代器進行清理,清除耗盡了的迭代器
        takeIndexWrapped();
}

//在一定時間內嘗試獲取並移除隊首元素,若超過指定時間仍無可用元素,
//則返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();    //可被中斷的加鎖
    try {

        //判斷佇列是否為空,若佇列為空則等待一定時間
        while (count == 0) {
            if (nanos <= 0)    //判斷等待時間是否合法
                return null;
            nanos = notEmpty.awaitNanos(nanos);    //等待一定時間
        }
        return dequeue();    
    } finally {
        lock.unlock();
    }
}

//移除並返回隊首元素,若佇列中沒有剩餘空間,那當前
//出隊操作的執行緒進入AQS條件等待佇列中等待條件滿足被喚醒
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //佇列為空,進入等待,執行緒掛起
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

5.peek方法

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();    //加鎖
    try {
        return itemAt(takeIndex);     //返回索引處的元素
        lock.unlock();
    }
}

//獲取陣列i索引的元素
final E itemAt(int i) {
    return (E) items[i];
}

6.remove方法

public boolean remove(Object o) {

    //佇列中不存在null元素,要刪除null,直接失敗
    if (o == null) return false;    
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //判斷佇列是否是空佇列,若是空佇列那麼
        //就不需要去佇列中查詢,直接刪除失敗
        //否則,再去佇列中查詢
        if (count > 0) {
            final int putIndex = this.putIndex;    //獲取隊尾索引
            int i = takeIndex;    //獲取隊首索引

            //遍歷隊首到隊尾直接的所有元素,查詢是否存在要刪除的元素
            do {
                if (o.equals(items[i])) {
                    removeAt(i);    //找到要刪除的元素,執行刪除方法
                    return true;
                }
                if (++i == items.length)    //判斷索引是否越界,越界歸零
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

//將索引處元素從陣列中移除,並整理陣列
void removeAt(final int removeIndex) {
    
    final Object[] items = this.items;

    //若要刪除的元素索引正好是隊尾元素,那麼直接刪除替換成null即可
    if (removeIndex == takeIndex) {
        
        items[takeIndex] = null;    //刪除元素
        if (++takeIndex == items.length)    //索引越界歸零
            takeIndex = 0;
        count--;    //元素個數-1
        if (itrs != null)    //整理迭代器
            itrs.elementDequeued();
    } else {
        
        //要刪除的元素索引不為隊尾元素,那麼刪除的元素空出來的位置需要將後續
        //到隊尾的元素全都索引+1
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)    //下個索引是否越界,越界歸零
                next = 0;

            //判斷移動陣列中索引是否到達隊尾
            //不是隊尾索引的話,將下個索引中元素移到當前索引即可
            //到達隊尾索引,則要賦為null,並且隊尾索引要改變成當前索引
            if (next != putIndex) {    
                items[i] = items[next];
                i = next;
            } else {
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;    //元素個數-1
        if (itrs != null)        //整理迭代器
            itrs.removedAt(removeIndex);
    }
    notFull.signal();    //喚醒入隊操作
}

7.size方法

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return count;    //佇列元素個數
    } finally {
        lock.unlock();
    }
}