JDK容器與併發—Queue—PriorityBlockingQueue
概述
基於優先堆的無界阻塞佇列,PriorityQueue的執行緒安全版本。
資料結構
基於陣列的平衡二叉堆,在PriorityQueue基礎上,增加了一把鎖、一個條件:
private transient Object[] queue; // 增刪查公用的鎖 private final ReentrantLock lock; // 佇列為空時,阻塞take/poll執行緒的條件 private final Condition notEmpty; // 自旋鎖,用CAS方式獲取,用於動態擴充套件queue private transient volatile int allocationSpinLock;
構造器
與PriorityQueue幾乎一樣,除了需要對lock、notEmpty初始化:
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); // lock、notEmpty初始化 this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) heapify(); }
增刪查
容器調整策略(避免無限制擴充套件)
步驟(與PriorityQueue大致相同,除了採用自旋鎖的方式動態分配陣列,在獲取公用鎖下複製queue):
1)當queue已滿,若有元素入隊請求,則進行容量擴充套件;
2)在獲取公用鎖lock的前提下,釋放lock,採用自旋鎖的方式動態擴充套件陣列,允許與take/poll執行緒併發,完成分配後再重新獲取lock;
3)oldCap小於64則容量翻倍;否則增長50%;
4)檢查newCap是否在MAX_ARRAY_SIZE範圍內,若minCap有overflow或大於MAX_ARRAY_SIZE,丟擲OutOfMemoryError異常;否則容量最大不超過MAX_ARRAY_SIZE;
5)動態分配新容量的Object[];
6)獲取公用鎖,將舊queue中的元素複製過來。
while ((n = size) >= (cap = (array = queue).length)) // while使用是為了採用自旋鎖進行擴充套件queue
tryGrow(array, cap);
// 動態擴充套件queue
// 釋放公用鎖,採用自旋鎖,允許擴充套件過程中與take/poll執行緒併發,避免其在該過程中的等待
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // 釋放公用鎖
Object[] newArray = null;
if (allocationSpinLock == 0 && // 採用CAS方式獲取allocationSpinLock,進行動態分配
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // 其他執行緒正進行擴充套件,當前執行緒yield
Thread.yield();
lock.lock(); // 重新獲取公用鎖
if (newArray != null && queue == array) { // 複製佇列
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
基礎方法
與PriorityQueue一樣,除了增加兩個引數:Object[] array、Comparator<? super T> cmp,以保證併發性:
// 對元素x,從k往前移,保持二叉堆的平衡性
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
// 對元素x,從k往後移,保持二叉堆的平衡性
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = array[child];
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
if (cmp.compare(x, (T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}
增
步驟(過程與PriorityQueue一樣,除了考慮併發性加鎖):
1)獲取公用鎖lock;
2)檢查佇列是否已滿,若滿則採用自旋鎖的方式進行容量擴充套件;
3)從隊尾,對元素進行siftUp,保持二叉堆的平衡性;
4)向take/poll執行緒傳送notEmpty訊號;
5)釋放鎖lock;
6)返回true。
另外,由於PriorityBlockingQueue無界,add、put操作都直接委託給offer進行:
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
刪
步驟(過程與PriorityQueue一樣,除了考慮併發性加鎖):
1)獲取公用鎖lock;
2)檢查佇列是否為空,為空則返回null;
3)取出佇列最後一個元素,從索引0開始,對其進行siftDown,保持二叉堆的平衡性;
4)釋放鎖lock;
5)返回隊首元素值。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
查
步驟(過程與PriorityQueue一樣,除了考慮併發性加鎖):
1)獲取公用鎖lock;
2)檢查佇列是否為空,為空則返回null;
3)釋放鎖lock;
4)返回隊首元素值。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
迭代器
不保證元素的迭代順序,基於底層陣列的副本實現:
public Iterator<E> iterator() {
return new Itr(toArray());
}
final class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E)array[cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
特性
PriorityBlockingQueue中優先順序相同的元素處理
若多個元素的優先順序相同,則其順序是不固定的,可以採用二級比較方法來進一步排序,以下示例為按照元素的入隊順序進行二級比較:
class FIFOEntry<E extends Comparable<? super E>>
implements Comparable<FIFOEntry<E>> {
static final AtomicLong seq = new AtomicLong(0);
final long seqNum;
final E entry;
public FIFOEntry(E entry) {
seqNum = seq.getAndIncrement();
this.entry = entry;
}
public E getEntry() { return entry; }
public int compareTo(FIFOEntry<E> other) {
int res = entry.compareTo(other.entry);
if (res == 0 && other.entry != this.entry)
res = (seqNum < other.seqNum ? -1 : 1);
return res;
}
}
為什麼PriorityBlockingQueue的操作不直接在委託給PriorityQueue基礎上加鎖實現?
allocationSpinLock在動態擴充套件queue上的使用使得委託+lock是實現不了的。
PriorityBlockingQueue就是PriorityQueue的加鎖執行緒安全版。