ArrayBlockingQueue原始碼淺析
阿新 • • 發佈:2018-12-13
一個由陣列結構組成的有界阻塞佇列
構造方法
public ArrayBlockingQueue(int capacity) { this(capacity, false); } // 初始化陣列,例項化ReentrantLock和兩個等待佇列notEmpty、notFull public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
入隊
// 新增元素,通過多型呼叫offer方法 public boolean add(E e) { return super.add(e); } // 新增元素(如果陣列已滿就取消新增) public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; // 獲取鎖 lock.lock(); try { // 如果陣列已滿,就取消新增元素 if (count == items.length) return false; else { // 新增元素到陣列 enqueue(e); return true; } } finally { // 釋放鎖 lock.unlock(); } } // 阻塞式新增元素,直至新增成功 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 如果陣列已滿,就將當前執行緒加入到notFull的等待佇列 notFull.await(); // 新增元素到陣列 enqueue(e); } finally { lock.unlock(); } } // 新增元素,超時後取消新增 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果陣列已滿,就進入超時等待,如果超時了,就直接返回取消新增 while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } // 新增元素 enqueue(e); return true; } finally { lock.unlock(); } } // 新增元素 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // putIndex記錄的是下一次將要新增的元素的索引位置 items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 通知notEmpty的等待佇列的執行緒可以取出元素 notEmpty.signal(); }
出隊
// 取出元素,如果陣列中沒有元素,則返回null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } // 阻塞式獲取元素,直至取出元素 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try // 如果當前數組裡元素個數為0,則當前執行緒進入到notEmpty的等待佇列 while (count == 0) notEmpty.await(); // 取出元素 return dequeue(); } finally { lock.unlock(); } } // 取出元素,超時後取消新增 public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果陣列元素個數為0,就進入超時等待,如果超時了,就直接返回 while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } // 取出元素 return dequeue(); } finally { lock.unlock(); } } // 取出元素 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // takeIndex表示下一次取出元素時的索引位置 E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 通知notFull的等待佇列的執行緒可以新增元素 notFull.signal(); return x; }
包含
// 包含
public boolean contains(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
// 遍歷從takeIndex到putIndex,如果存在該元素,則返回true
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
刪除
// 刪除
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
// 從takeIndex到putIndex遍歷,如果存在該元素,則刪除
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}