併發佇列-無界阻塞優先順序佇列PriorityBlockingQueue原理探究
一、 前言
PriorityBlockingQueue是帶優先順序的無界阻塞佇列,每次出隊都返回優先順序最高的元素,是二叉樹最小堆的實現,研究過陣列方式存放最小堆節點的都知道,直接遍歷佇列元素是無序的。
二、 PriorityBlockingQueue類圖結構
如圖PriorityBlockingQueue內部有個陣列queue用來存放佇列元素,size用來存放佇列元素個數,allocationSpinLockOffset是用來在擴容佇列時候做cas的,目的是保證只有一個執行緒可以進行擴容。
由於這是一個優先順序佇列所以有個比較器comparator用來比較元素大小。lock獨佔鎖物件用來控制同時只能有一個執行緒可以進行入隊出隊操作。notEmpty條件變數用來實現take方法阻塞模式。這裡沒有notFull 條件變數是因為這裡的put操作是非阻塞的,為啥要設計為非阻塞的是因為這是無界佇列。
最後PriorityQueue q用來搞序列化的。
如下建構函式,預設佇列容量為11,預設比較器為null;
private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue (int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this .queue = new Object[initialCapacity];
}
三、 offer操作
在佇列插入一個元素,由於是無界佇列,所以一直為成功返回true;
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//如果當前元素個數>=佇列容量,則擴容(1)
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
//預設比較器為null
if (cmp == null)(2)
siftUpComparable(n, e, array);
else
//自定義比較器(3)
siftUpUsingComparator(n, e, array, cmp);
//佇列元素增加1,並且啟用notEmpty的條件佇列裡面的一個阻塞執行緒
size = n + 1;(9)
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
主流程比較簡單,下面看看兩個主要函式
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //must release and then re-acquire main lock
Object[] newArray = null;
//cas成功則擴容(4)
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//oldGap<64則擴容新增oldcap+2,否者擴容50%,並且最大為MAX_ARRAY_SIZE
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
//第一個執行緒cas成功後,第二個執行緒會進入這個地方,然後第二個執行緒讓出cpu,儘量讓第一個執行緒執行下面點獲取鎖,但是這得不到肯定的保證。(5)
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();(6)
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
tryGrow目的是擴容,這裡要思考下為啥在擴容前要先釋放鎖,然後使用cas控制只有一個執行緒可以擴容成功。我的理解是為了效能,因為擴容時候是需要花時間的,如果這些操作時候還佔用鎖那麼其他執行緒在這個時候是不能進行出隊操作的,也不能進行入隊操作,這大大降低了併發性。
所以在擴容前釋放鎖,這允許其他出隊執行緒可以進行出隊操作,但是由於釋放了鎖,所以也允許在擴容時候進行入隊操作,這就會導致多個執行緒進行擴容會出現問題,所以這裡使用了一個spinlock用cas控制只有一個執行緒可以進行擴容,失敗的執行緒呼叫Thread.yield()讓出cpu,目的意在讓擴容執行緒擴容後優先呼叫lock.lock重新獲取鎖,但是這得不到一定的保證,有可能呼叫Thread.yield()的執行緒先獲取了鎖。
那copy元素資料到新陣列為啥放到獲取鎖後面那?原因應該是因為可見性問題,因為queue並沒有被volatile修飾。另外有可能在擴容時候進行了出隊操作,如果直接拷貝可能看到的陣列元素不是最新的。而通過呼叫Lock後,獲取的陣列則是最新的,並且在釋放鎖前 陣列內容不會變化。
具體建堆演算法:
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
//佇列元素個數>0則判斷插入位置,否者直接入隊(7)
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;(8)
}
下面用圖說話模擬下過程:
假設佇列容量為2
- 第一次offer(2)時候
執行(1)為false所以執行(2),由於k=n=size=0;所以執行(8)元素入隊,然執行(9)size+1;
現在佇列狀態:
- 第二次offer(4)時候
執行(1)為false,所以執行(2)由於k=1,所以進入while迴圈,parent=0;e=2;key=4;key>e所以break;然後把4存到資料下標為1的地方,這時候佇列狀態為:
- 第三次offer(6)時候
執行(1)為true,所以呼叫tryGrow,由於2<64所以newCap=2 + (2+2)=6;然後建立新陣列並拷貝,然後呼叫siftUpComparable;k=2>0進入迴圈 parent=0;e=2;key=6;key>e所以break;然後把6放入下標為2的地方,現在佇列狀態:
- 第四次offer(1)時候
執行(1)為false,所以執行(2)由於k=3,所以進入while迴圈,parent=0;e=2;key=1; key<e;所以把2複製到陣列下標為3的地方,然後k=0退出迴圈;然後把2存放到下標為0地方,現在狀態:
四、 poll操作
在佇列頭部獲取並移除一個元素,如果佇列為空,則返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
主要看dequeue
private E dequeue() {
//佇列為空,則返回null
int n = size - 1;
if (n < 0)
return null;
else {
//獲取隊頭元素(1)
Object[] array = queue;
E result = (E) array[0];
//獲取對尾元素,並值null(2)
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)//cmp=null則呼叫這個,把對尾元素位置插入到0位置,並且調整堆為最小堆(3)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;(4)
return result;
}
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];(5)
int right = child + 1;(6)
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
c = array[child = right];
if (key.compareTo((T) c) <= 0)(8)
break;
array[k] = c;
k = child;
}
array[k] = key;(9)
}
}
下面用圖說話模擬下過程:
- 第一次呼叫poll()
首先執行(1) result=1;然後執行(2)x=2;這時候佇列狀態
然後執行(3)後狀態為:
執行(4)後的結果:
下面重點說說siftDownComparable這個屌屌的建立最小堆的演算法:
首先說下思想,其中k一開始為0,x為數組裡面最後一個元素,由於第0個元素為樹根,被出隊時候要被搞掉,所以建堆要從它的左右孩子節點找一個最小的值來當樹根,子樹根被搞掉後,會找子樹的左右孩子最小的元素來代替,直到樹節點為止,還不明白,沒關係,看圖說話:
假如當前佇列元素:
那麼對於樹為:
這時候如果呼叫了poll();那麼result=2;x=11;現在樹為:
然後看leftChildVal = 4;rightChildVal = 6; 4<6;所以c=4;也就是獲取根節點的左右孩子值小的那一個; 然後看11>4也就是key>c;然後把c放入樹根,現在樹為:
然後看根的左邊孩子4為根的子樹我們要為這個字樹找一個根節點
看leftChildVal = 8;rightChildVal = 10; 8<10;所以c=8;也就是獲取根節點的左右孩子值小的那一個; 然後看11>8也就是key>c;然後把c放入樹根,現在樹為:
這時候k=3;half=3所以推出迴圈,執行(9)後結果為:
這時候佇列為:
五、 put操作
內部呼叫的offer,由於是無界佇列,所以不需要阻塞
public void put(E e) {
offer(e); // never need to block
}
六、 take操作
獲取佇列頭元素,如果佇列為空則阻塞。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//如果佇列為空,則阻塞,把當前執行緒放入notEmpty的條件佇列
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
這裡是阻塞實現,阻塞後直到入隊操作呼叫notEmpty.signal 才會返回。
七、 size操作
獲取佇列元個數,由於加了獨佔鎖所以返回結果是精確的
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
八、 開源框架中使用
目前還沒找到..
九、總結
PriorityBlockingQueue始終保證出隊的元素是優先順序最高的元素而不是在佇列裡面停留時間最長的原始,並且可以定製優先順序的規則,內部通過使用一個二叉樹最小堆演算法來維護內部陣列,這個陣列是可擴容的,噹噹前元素個數>=最大容量時候會通過演算法擴容,當佇列任務裡面的任務由優先順序時候本佇列比較實用。
PriorityBlockingQueue類似於ArrayBlockingQueue內部使用一個獨佔鎖來控制同時只有一個執行緒可以進行入隊和出隊,另外前者只使用了一個notEmpty條件變數而沒有notFull這是因為前者是無界佇列,當put時候永遠不會處於await所以也不需要被喚醒,並且take方法由於是阻塞方法,所以是可被中斷的,其他方法對中斷標誌不理會。
值得注意的是為了避免在擴容操作時候其他執行緒不能進行出隊操作,實現上使用了先釋放鎖,然後通過cas保證同時只有一個執行緒可以擴容成功。