Java 集合框架分析:PriorityBlockingQueue java1.8
哈哈,終於有了第二篇部落格了,終於知道編輯一個部落格需要注意什麼了,希望堅持下去,每天看點小原始碼!
目錄
1.簡述PriorityBlockingQueue
2.主要方法及實現
3.使用過程中需要注意的地方
4.和其他的相關容器的比較
5.總結
簡述PriorityBlockingQueue
特點:
1.屬於併發安全的集合。(什麼是併發安全的集合:即多執行緒的情況下,不會出現不確定的狀態)。
2.無界的佇列。
3.它的blocking表現在取元素時,如果佇列為空,則取元素的執行緒會阻塞。
4.不允許null元素
主要方法及實現
1.主要的成員
同Priorityqueue,使用底層陣列儲存資料,擁有兩把鎖,一個可重入鎖,一個自旋鎖。
private final ReentrantLock lock; /** * Condition for blocking when empty */ private final Condition notEmpty; /** * Spinlock for allocation, acquired via CAS. */ private transient volatile int allocationSpinLock; private transient Object[] queue; /** * The number of elements in the priority queue. */ private transient int size;
2.插入
步驟:
a.null檢測
b.因為priorityblockingQueue是執行緒安全的,所以,插入刪除都是需要加鎖的。這裡先進行加鎖。
c.如果需要擴容,則先擴容。關於擴容操作,一會再說。
d.插入元素,調整順序(和priorityqueue是一樣的)
e.傳送資訊,啟用阻塞的取資料的執行緒
f.釋放鎖(必須的,因為不是用的Synchronized,而是用的lock進行控制的)
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
3.擴容操作
擴容發生在你要插入元素時,發現底層陣列大小不夠,則需要擴容。這裡在判斷時,已經獲取的reentrantlock鎖,因為要擴容,說明queue不為空。另一方面,擴容時,需要發生底層陣列的重新複製到新陣列中,而取資料的執行緒當前還不會讀到新加入的資料(先取你之間加入的,這是happen-before原則,你新資料還沒有插入成功,別人是看不到的),所以為了提高併發量,這裡需要先釋放reentrantlock,讓其他的讀執行緒能夠進行(寫執行緒還是會阻塞,因為要寫,還是要進行擴容,會在獲取擴容鎖時阻塞)。突然想到一個問題,如果A寫入時,發現滿了,因此要擴容,所示擴容期間,釋放了鎖。B poll操作,然後C要寫入,但是此時容量是允許的,這不就讓C在A之前了麼?這個問題是什麼情況?
這裡進行擴容使用的是CAS鎖,當獲取鎖不成功時,說明有其他執行緒在擴容,則等待。在成功擴容之後,需要重新獲取主鎖(即reentrantlock),然後修改queue底層陣列引用。
private void tryGrow(Object[] array, int oldCap)
{
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1))
{
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0)
{ // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array)
{
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
4.刪除元素
刪除元素有多個版本:
不阻塞的:poll()(如果沒有元素可用,則直接返回null)
阻塞的:take()
等待一段時間的:poll(long timeout, TimeUnit unit)
實現都大概相同:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
使用過程中需要注意的地方
1.priorityblockingqueue在操作佇列時,都是共用的一把鎖(在擴容時,用到了自旋鎖,會釋放一段主鎖,然後重新獲取)
2.peek,offer,poll,size等都是要獲取同一把鎖的,效率不是很高
3.在序列化時,為了提供效率,會先將資料放入到priorityqueue中,然後一次性加入到阻塞佇列中,增加操作效率(不用每次都獲取鎖)
和其他的相關容器的比較
和priorityqueue基本相同,處理有加鎖的操作外。
總結
除非是在多執行緒中,否則不要使用,幾乎所有操作都要競爭同一把鎖。