Java原始碼閱讀之PriorityBlockingQueue
阿新 • • 發佈:2019-02-05
Summary:
- public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
- 本類與普通的PriorityBlockingQueue類中的方法基本一致,區別在於本類的方法都會在方法執行前先加鎖執行完再解鎖;
- add、put會呼叫notEmpty.signal();喚醒等待的執行緒
- take如果遇到佇列為空則;呼叫await()方法,阻塞知道被喚醒
Fields:
private final ReentrantLock lock; //鎖 private final Condition notEmpty; //條件鎖,用於佇列為空時阻塞 private transient volatile int allocationSpinLock; private transient Comparator<? super E> comparator; //比較器 private transient Object[] queue;
Constructor:
public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
put&add
//該方法會阻塞,新增成功返回true,否則爆出異常 //新增成功會觸發notEmpty.signal();使得原本在讀取資料時阻塞的執行緒能夠解除阻塞 public void put(E e) { offer(e); } public boolean add(E e) { return offer(e); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); //此處上鎖,所以offer方法會產生阻塞 int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); //擴容,直到擴容成功才退出迴圈,注意這裡擴容的工作並不一定是本執行緒完成的,具體原因看擴容方法tryGrow try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); //調整堆中資料,與普通優先佇列無異 else siftUpUsingComparator(n, e, array, cmp); //調整堆中資料,與普通優先佇列無異 size = n + 1; notEmpty.signal(); //解除空佇列的條件阻塞 } finally { lock.unlock(); //解鎖 } return true; }
take
//從佇列中讀取一個數據,如果佇列沒有資料則阻塞
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
poll
//該方法只是嘗試去從佇列中獲取第一個數,如果沒有則返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
擴容操作;
該操作先釋放鎖;最後再嘗試獲取鎖 然後很多線可能都來請求擴容,先進行CAS操作判斷當前是否只有自己一個執行緒在擴容; 如果是則正常情況擴容; 不是則直到只有自己一個執行緒再去正常擴容; 正常庫容還會判斷自己是否是第一次在這裡擴容,如果不是則不操作System.arraycopy()private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //先釋放鎖
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield(); //如果有桶等級的執行緒在等待,可以叫它們先幹活,當前執行緒休息下
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
UNSAFE
上面出現的UNSAFE是一種非阻塞同步機制;
大概意思就是先操作,如果達不到預期目的再操作一次,直到成功為止;
CAS操作;只有通過bootstrap ClassLoader載入的class才能訪問它;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long allocationSpinLockOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = PriorityBlockingQueue.class;
allocationSpinLockOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("allocationSpinLock"));
} catch (Exception e) {
throw new Error(e);
}
}