併發容器學習—ArrayBlockingQueue
阿新 • • 發佈:2019-05-02
一、ArrayBlockingQueue併發容器
1.ArrayBlockingQueue的繼承體系
見名知義,ArrayBlockingQueue是個由陣列支援的有界阻塞佇列。也遵循先進先出(FIFO)的原則,也就是說ArrayBlockingQueue的容量是有限的,並不能像ArrayList那樣自動擴容。ArrayBlockingQueue的繼承關係如下圖所示:
其中除了BlockingQueue介面未接觸過,其餘的類和介面都已經分析過,不在贅言。
public interface BlockingQueue<E> extends Queue<E> { //將指定的元素新增到隊尾,若佇列容量已滿,則拋異常 boolean add(E e); //將指定的元素新增到隊尾,若佇列容量已滿,那麼放棄新增,返回false boolean offer(E e); //將指定的元素新增到隊尾,若佇列已滿,那麼等待佇列有容量時在新增 //可被中斷 void put(E e) throws InterruptedException; //在一定時間內嘗試將指定元素新增到隊尾,若超過指定時間仍未新增成功, //則放棄新增,返回false boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //若佇列不為空,則獲取並移除隊首,若佇列為空,則等待佇列有元素可用 //可被中斷 E take() throws InterruptedException; //在一定時間內嘗試獲取並移除隊首元素,若超過指定時間仍無可用元素, //則返回null E poll(long timeout, TimeUnit unit) throws InterruptedException; //返回佇列的剩餘容量,不包含阻塞中的元素 int remainingCapacity(); //刪除佇列中的元素o boolean remove(Object o); //判斷佇列中是否含有元素o public boolean contains(Object o); //將佇列中的所有元素轉移到容器c中,佇列中不留任何元素 int drainTo(Collection<? super E> c); //將佇列中最多maxElements個元素刪除並轉移到容器c中 int drainTo(Collection<? super E> c, int maxElements); }
2.重要的屬性及構造方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //底層資料儲存的陣列 final Object[] items; //隊首元素在陣列中的索引 int takeIndex; //隊尾元素在陣列中的索引 int putIndex; //佇列中元素的個數 int count; //重入鎖,用於同步操作 final ReentrantLock lock; //條件,允許出隊的條件 private final Condition notEmpty; //允許入隊的條件 private final Condition notFull; //迭代器 transient Itrs itrs = null; //建立一個容量為capacity的預設佇列(不公平的佇列) public ArrayBlockingQueue(int capacity) { this(capacity, false); } //建立一個容量為capacity的佇列,是否公平由fair決定 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //陣列大小 lock = new ReentrantLock(fair); //公平或不公平鎖 notEmpty = lock.newCondition(); //出隊條件 notFull = lock.newCondition(); //入隊條件 } //建立一個容量為capacity的佇列,是否公平由fair決定,且佇列中 //包含有容器c中的元素 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); //加鎖 try { int i = 0; try { for (E e : c) { checkNotNull(e); //元素不能為null items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } }
3.入隊過程
在ArrayBlockingQueue中入隊的方法有三個:add、offer和put,都是將元素新增到隊尾,我們逐個來看:
//新增元素到隊尾,若佇列沒有剩餘容量,則放棄新增 public boolean offer(E e) { //入隊元素不能為null,這裡說明ArrayBlockingQueue中不允許有null元素存在 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); //加鎖 try { //判斷佇列中的元素個數是否超過佇列容量 //若佇列沒有剩餘容量存放元素,那麼就放棄新增 if (count == items.length) return false; else { enqueue(e); //入隊方法 return true; } } finally { lock.unlock(); //解鎖 } } /** * 入隊方法 * ArrayBlockingQueue的入隊和出隊操作是典型的生產者消費者模式的使用 * ArrayBlockingQueue也是個典型生產者消費者佇列,當佇列中沒有元素(產品) * 時,會掛起或拒絕所有的出隊(消費)操作;當佇列中的元素滿了時,會掛起或拒 * 絕所有入隊(生產)操作。而當佇列中有元素時,則會喚醒或接受所有出隊操作 * 同理,佇列中還有空間時,則會喚醒或接受所有入隊操作 */ private void enqueue(E x) { //獲取底層陣列引用 final Object[] items = this.items; items[putIndex] = x; //將元素新增到隊尾 //判斷隊尾索引是否超過陣列的長度 /** * ArrayBlockingQueue中底層陣列採用環形陣列來 * 實現陣列的重複利用,putIndex永遠指向隊尾元素的索引 * 因此當putIndex的值等於陣列長度時,說明已經到達陣列的 * 最大索引,需要回到0的索引位置了 */ if (++putIndex == items.length) putIndex = 0; count++; //元素個數+1 notEmpty.signal(); //喚醒執行出隊操作的執行緒 } //新增元素到隊尾,若佇列沒有剩餘容量,則丟擲異常 public boolean add(E e) { return super.add(e); //父類方法 } //父類中的add方法 public boolean add(E e) { //執行子類的offer新增方法判斷新增是否成功 if (offer(e)) return true; else //新增失敗丟擲異常 throw new IllegalStateException("Queue full"); } //在一定時間內嘗試將指定元素新增到隊尾,若超過指定時間仍未新增成功, //則放棄新增,返回false public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); //不允許新增null long nanos = unit.toNanos(timeout); //時間換算 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //可被打斷的加鎖 try { //迴圈判斷佇列是否已滿 /** * 迴圈判斷是為防止‘虛假喚醒’,即有多個入隊操作執行緒被喚醒 * 但若此時只有一個剩餘容量,那麼這多個入隊操作只有一個應該被執行 * 其他的入隊操作仍應該繼續等待,如果使用if來進行一次性的判斷 * 那麼就會造成在只有一個一個剩餘容量的情況下,執行多次入隊操作 * 這是不允許的,因此應該使用while來繼續判斷佇列是否已滿,防止出現 * 虛假喚醒。 */ while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); //等待nanos時間 } enqueue(e); return true; } finally { lock.unlock(); //解鎖 } } //將指定的元素新增到隊尾,若佇列已滿,那麼等待佇列有容量時在新增 //可被中斷 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //可被中斷的加鎖 try { //判斷佇列是否已滿,若是佇列已滿, //那麼執行入隊操作的當前執行緒進入條件佇列(條件 //佇列指的是重入鎖底層AQS同步器中的執行緒條件佇列, //不是當前的元素佇列)中等待,只有佇列有剩餘空間時 //才會被喚醒嘗試入隊,這裡也存在虛假喚醒的可能, //因此也需要使用while對喚醒的執行緒進行在一次的判斷 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
4.出隊過程
在ArrayBlockingQueue中的出隊方法有poll和take方。
//將隊首元素移除出隊並返回,若佇列為空,則返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//真正執行出隊操作的方法
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //獲取隊尾元素
items[takeIndex] = null; //將隊尾元素從陣列中移除
//判斷隊尾元素的索引takeIndex是否超過陣列的邊界
//超過陣列索引最大值,則回到陣列的索引起點0
if (++takeIndex == items.length)
takeIndex = 0;
count--; //佇列中元素-1
//判斷是否使用了佇列的迭代器功能
if (itrs != null)
itrs.elementDequeued(); //整理迭代器
//執行了一個出隊操作,佇列必不可能滿,那麼可以喚醒一個入隊執行緒
notFull.signal();
return x;
}
//內部類Itrs中的整理迭代器結點的方法
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0) //佇列中沒有元素時,需要清空所有迭代器
queueIsEmpty();
else if (takeIndex == 0)
//隊尾索引指向索引0時需要將陣列的迴圈次數+1
//然後對所有的迭代器進行清理,清除耗盡了的迭代器
takeIndexWrapped();
}
//在一定時間內嘗試獲取並移除隊首元素,若超過指定時間仍無可用元素,
//則返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //可被中斷的加鎖
try {
//判斷佇列是否為空,若佇列為空則等待一定時間
while (count == 0) {
if (nanos <= 0) //判斷等待時間是否合法
return null;
nanos = notEmpty.awaitNanos(nanos); //等待一定時間
}
return dequeue();
} finally {
lock.unlock();
}
}
//移除並返回隊首元素,若佇列中沒有剩餘空間,那當前
//出隊操作的執行緒進入AQS條件等待佇列中等待條件滿足被喚醒
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//佇列為空,進入等待,執行緒掛起
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
5.peek方法
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock(); //加鎖
try {
return itemAt(takeIndex); //返回索引處的元素
lock.unlock();
}
}
//獲取陣列i索引的元素
final E itemAt(int i) {
return (E) items[i];
}
6.remove方法
public boolean remove(Object o) {
//佇列中不存在null元素,要刪除null,直接失敗
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//判斷佇列是否是空佇列,若是空佇列那麼
//就不需要去佇列中查詢,直接刪除失敗
//否則,再去佇列中查詢
if (count > 0) {
final int putIndex = this.putIndex; //獲取隊尾索引
int i = takeIndex; //獲取隊首索引
//遍歷隊首到隊尾直接的所有元素,查詢是否存在要刪除的元素
do {
if (o.equals(items[i])) {
removeAt(i); //找到要刪除的元素,執行刪除方法
return true;
}
if (++i == items.length) //判斷索引是否越界,越界歸零
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
//將索引處元素從陣列中移除,並整理陣列
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//若要刪除的元素索引正好是隊尾元素,那麼直接刪除替換成null即可
if (removeIndex == takeIndex) {
items[takeIndex] = null; //刪除元素
if (++takeIndex == items.length) //索引越界歸零
takeIndex = 0;
count--; //元素個數-1
if (itrs != null) //整理迭代器
itrs.elementDequeued();
} else {
//要刪除的元素索引不為隊尾元素,那麼刪除的元素空出來的位置需要將後續
//到隊尾的元素全都索引+1
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length) //下個索引是否越界,越界歸零
next = 0;
//判斷移動陣列中索引是否到達隊尾
//不是隊尾索引的話,將下個索引中元素移到當前索引即可
//到達隊尾索引,則要賦為null,並且隊尾索引要改變成當前索引
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--; //元素個數-1
if (itrs != null) //整理迭代器
itrs.removedAt(removeIndex);
}
notFull.signal(); //喚醒入隊操作
}
7.size方法
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count; //佇列元素個數
} finally {
lock.unlock();
}
}