JUC原始碼分析-集合篇(七):PriorityBlockingQueue
PriorityBlockingQueue 是二叉堆結構的無界優先順序阻塞佇列,使用顯示鎖 Lock 保證執行緒安全,是一個執行緒安全的 PriorityQueue。元素的優先順序順序通過 Comparator 實現,內部不可新增不可比較的值。相較於我們前幾章所講的Queue,PriorityBlockingQueue 算是一個老牌的隊列了,從JDK1.5加入JUC行列,如果我們需要對佇列進行優先順序排序,PriorityBlockingQueue 將是一個不錯的選擇。
資料結構及核心引數
//預設容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; //陣列最大長度 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //內部儲存元素的陣列,基於二叉堆實現 private transient Object[] queue; //Lock used for all public operations private final ReentrantLock lock; //Condition for blocking when empty private final Condition notEmpty; /**初始0為可獲取狀態,用於控制擴容操作*/ private transient volatile int allocationSpinLock; //比較器 private transient Comparator<? super E> comparator; //內部PriorityQueue引用,用於相容序列化 private PriorityQueue<E> q;
二叉堆
PriorityBlockingQueue(後稱PBQ)的資料由內部的一個 Object 陣列(queue
)儲存,這個陣列本質上是一個二叉堆:
二叉堆
二叉堆是一種特殊的堆,二叉堆是完全二叉樹或者是近似完全二叉樹。二叉堆滿足堆特性:父節點的鍵值總是與任何一個子節點的鍵值保持固定的序關係,且每個節點的子節點也都是一個二叉堆。
當父節點的鍵值總是大於或等於任何一個子節點的鍵值時為最大堆。 當父節點的鍵值總是小於或等於任何一個子節點的鍵值時為最小堆。
二叉堆一般用陣列來表示。如果根節點在陣列中的位置是1,第n個位置的子節點分別在 2*n 和 2*n+1。因此,第1個位置的子節點在2和3,第2個位置的子節點在4和5。以此類推。這種基於1的陣列儲存方式便於尋找父節點和子節點。如果儲存陣列的下標基於0,那麼下標為i的節點的子節點是 2*n + 1 與 2*(n+1);其父節點的下標是 (n − 1)/2,PriorityBlockingQueue 中使用的就是基於0下標的二叉堆。
原始碼解析
如果你理解了我們前幾章的內容,會發現 PBQ 的入隊/出隊操作都很簡單,沒有太複雜的演算法,無非就是對二叉堆的插入/刪除操作。
offer(E e)
//入隊 public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) //佇列擴容 tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; //找到合適位置插入元素 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
說明:首先對佇列加鎖,判斷佇列是否需要擴容,如果需要呼叫tryGrow()
方法進行擴容;然後呼叫siftUpComparable()
方法找到合適位置插入元素;更新佇列元素數size
,喚醒等待notEmpty
的執行緒,最後別忘了解除鎖定unlock。
- PBQ 的擴容說明:當佇列元素數大於等於陣列的長度時,會觸發擴容操作,擴容是由單執行緒完成的。如果陣列長度 cap 小於64,擴容長度為 2*(cap+1);否則擴容長度為原來的1.5倍(1.5*cap),
tryGrow()
原始碼如下:
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//計算新的陣列容量(1.5*cap或2*(cap+1))
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
//如果有其他執行緒已經在進行擴容操作,當前執行緒讓出執行時間片
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
siftUpComparable()
和siftUpUsingComparator()
,由於佇列中的元素都是有優先順序(基於comparator
排序)的,所以如果有新元素進來不會像其他佇列一樣直接放在隊尾,而是通過這兩個方法找到新增元素在佇列中的排序位置然後插入,原始碼如下:
//在k位置插入元素x,從父節點開始向上找到合適位置,保持二元堆的性質不變
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
//從父節點開始向上查詢,並保持二叉堆性質
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;//找到合適位置,跳出迴圈
array[k] = e;
k = parent;
}
array[k] = key;
}
//自定義Comparator版本的siftUpComparable
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
//從父節點開始向上查詢,並保持二叉堆性質
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;//找到合適位置,跳出迴圈
array[k] = e;
k = parent;
}
array[k] = x;
}
poll()
//出列
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
/**
* Mechanics for poll(). Call only while holding lock.
*/
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];//第一個元素,即出列元素
E x = (E) array[n];//最後一個元素
array[n] = null;
Comparator<? super E> cmp = comparator;
//重構二叉堆
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
說明: 在出列操作時,首先移除陣列的最後一個元素,然後呼叫siftDownComparable
或siftDownUsingComparator
方法進行二叉堆的重組,最後返回佇列的第一個元素。
PBQ的出列本質上是刪除二叉堆的根節點,然後,把堆儲存的最後那個節點移到填在根節點處,再從上而下調整父節點與它的子節點。siftDownComparable
和siftDownUsingComparator
原始碼如下:
//在k位置插入元素x,從子節點開始向下調整節點位置,保持二叉堆的性質不變
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
//獲取最後一個節點的父節點
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
//從左葉子節點向下調整
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = array[child];
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
if (cmp.compare(x, (T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}
小結
本章重點:理解 PriorityBlockingQueue 的優先順序策略實現,二叉堆資料結構。
作者:泰迪的bagwell 連結:https://www.jianshu.com/p/fd26c91cd2a0 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。