併發容器學習—DelayQueue與PriorityBlockingQueue
阿新 • • 發佈:2019-05-02
一、DelayQueue併發容器
1.Delay
Queue的底層實現
Delay
Queue是一個執行緒安全且無界的阻塞佇列,只有在延遲時間滿足後才能獲取佇列中的元素,因此佇列中的元素必須實現Delay介面,在建立元素時指定多久時間後才能從佇列中獲取該元素。Delay
Queue的底層實現是使用了PriorityQueue+ReentrantLock來實現延遲獲取功能。
2.PriorityQueue分析
其中PriorityQueue是種優先順序佇列,執行緒不安全,佇列中的元素會按照優先順序來排序。該佇列底層實現是使用二叉堆,並且元素按照其自然順序進行排序,或者根據構造佇列時提供的Comparator進行排序。因為PriorityQueue中的元素都要進行比較,所以優先順序佇列中不能擁有null元素,也不能有不能比較的元素。
PriorityQueue的繼承關係如下圖:
PriorityQueue中的屬性及構造方法:
public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable { //佇列的預設容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; //底層用於存放資料的陣列 transient Object[] queue; // non-private to simplify nested class access //佇列中的元素數量計數 private int size = 0; //比較器 private final Comparator<? super E> comparator; //快速失敗機制使用的變數 transient int modCount = 0; //建立一個預設容量的佇列 public PriorityQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } //建立一個指定容量的佇列 public PriorityQueue(int initialCapacity) { this(initialCapacity, null); } //建立一個指定比較器的預設容量佇列 public PriorityQueue(Comparator<? super E> comparator) { this(DEFAULT_INITIAL_CAPACITY, comparator); } //建立一個指定比較器且指定容量佇列 public PriorityQueue(int initialCapacity, Comparator<? super E> comparator) { //判斷指定的容量值是否合法 if (initialCapacity < 1) throw new IllegalArgumentException(); this.queue = new Object[initialCapacity]; //初始化底層陣列 this.comparator = comparator; //比較器初始化 } //建立一個帶有指定集合中的元素的佇列 @SuppressWarnings("unchecked") public PriorityQueue(Collection<? extends E> c) { //判斷c是否是有序集合 //若是有序集合,那麼就以其比較器作為佇列的比較器 if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); initElementsFromCollection(ss); } //判斷集合是否是優先順序佇列 //若是的話,直接使用該佇列的比較器, else if (c instanceof PriorityQueue<?>) { PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); initFromPriorityQueue(pq); } else { this.comparator = null; initFromCollection(c); } } //將容器c中的元素新增到優先順序佇列中 private void initElementsFromCollection(Collection<? extends E> c) { Object[] a = c.toArray(); // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, a.length, Object[].class); int len = a.length; if (len == 1 || this.comparator != null) for (int i = 0; i < len; i++) if (a[i] == null) throw new NullPointerException(); this.queue = a; this.size = a.length; } //將優先順序佇列c中的元素新增到當前優先順序佇列中 private void initFromPriorityQueue(PriorityQueue<? extends E> c) { if (c.getClass() == PriorityQueue.class) { this.queue = c.toArray(); this.size = c.size(); } else { initFromCollection(c); } } //將容器c中的元素新增到優先順序佇列中 private void initFromCollection(Collection<? extends E> c) { initElementsFromCollection(c); heapify(); } //建立包含優先順序佇列c中元素的佇列,且使用同一個比較器 @SuppressWarnings("unchecked") public PriorityQueue(PriorityQueue<? extends E> c) { this.comparator = (Comparator<? super E>) c.comparator(); initFromPriorityQueue(c); } //建立包含排序集合c中元素的優先順序佇列,且使用同一個比較器 @SuppressWarnings("unchecked") public PriorityQueue(SortedSet<? extends E> c) { this.comparator = (Comparator<? super E>) c.comparator(); initElementsFromCollection(c); } }
PriorityQueue中的入隊方法分析:
//add與offer沒有區別 public boolean add(E e) { return offer(e); } public boolean offer(E e) { //佇列中不允許有null元素 if (e == null) throw new NullPointerException(); modCount++; //快速失敗機制 int i = size; //獲取當前佇列中元素個數 //判斷陣列是否需要擴容 if (i >= queue.length) grow(i + 1); size = i + 1; //元素計數+1 //新增元素的插入位置 //若佇列原本為空,則直接放到0位置 //若佇列原本不為空 if (i == 0) queue[0] = e; else siftUp(i, e); //插入陣列 return true; } //擴容 private void grow(int minCapacity) { int oldCapacity = queue.length; //佇列舊容量 //擴容機制,佇列原容量小於64時,擴容為原來的2倍再加2 //大於64,則擴大1.5倍 int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1)); if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity); queue = Arrays.copyOf(queue, newCapacity); } //上浮 /** * 上浮過程 * 假設已有一個有序堆(升序)如下所示: * 10 * / \ * 20 40 * / \ / * 60 70 90 * 現在要將元素30插入堆中,則有 * 1.將要插入的30先放在二叉堆的末尾 * 2.再將其與父結點進行比較,判斷是否要上浮(小於父結點就上浮) * 3.若小於父結點則交換位置,再重複第2步驟繼續上浮 * 4.若大於則直接結束上浮 * 10 10 * / \ / \ * 20 40 ——> 20 30 * / \ / \ / \ / \ * 60 70 90 30 60 70 90 40 */ private void siftUp(int k, E x) { //判斷佇列是自然排序還是比較器排序 if (comparator != null) siftUpUsingComparator(k, x); //比較器排序 else siftUpComparable(k, x); //自然排序 } //入隊操作本質是一個堆排序中的一個上浮的過程 private void siftUpUsingComparator(int k, E x) { //判斷索引位置是否大於0,即是否到達堆頂 while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (comparator.compare(x, (E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = x; } //另一個上浮方法,使用的自然排序 private void siftUpComparable(int k, E x) { Comparable<? super E> key = (Comparable<? super E>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (key.compareTo((E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = key; }
PriorityQueue中的出隊方法分析:
public E poll() {
if (size == 0) //判斷佇列是否是空佇列
return null;
int s = --size;
modCount++;
E result = (E) queue[0]; //取出隊首元素
E x = (E) queue[s]; //獲取隊尾元素
queue[s] = null; //隊尾賦null
//將原本的隊尾元素放到堆頂,再對整個堆進行排序整理
//即下沉
if (s != 0)
siftDown(0, x); //下沉方法
return result;
}
//下沉
/**
* 下沉過程
* 假設已有一個有序堆(升序)如下所示:
* 10
* / \
* 20 30
* / \ / \
* 60 70 90 40
* 現在要將元素10出隊,則有
* 1.將要出隊的10移除出二叉堆,並將隊尾40放到堆頂
* 2.將堆頂元素與兩個子結點中較小的元素相比較,選擇小的元素作為新的堆頂元素
* 3.重複對堆中前一半結點進行將第2步的比較交換
* 40 20
* / \ / \
* 20 30 ——> 40 30
* / \ / / \ /
* 60 70 90 60 70 90
*/
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x); //比較器下沉
else
siftDownComparable(k, x); //自然排序下沉
}
//使用自然排序下沉
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; //下沉要對堆中前一半的結點都進行
while (k < half) {
int child = (k << 1) + 1;
Object c = queue[child]; //獲取當前結點的左孩子
int right = child + 1; //右孩子索引
//若存在右孩子,那麼左右孩子先比較大小,取小再與父結點比較
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
//父結點與子結點比較
//若父結點小於子結點,則直接結束下沉的過程
//否則,互動位置後繼續下沉操作
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
//使用比較器下沉
@SuppressWarnings("unchecked")
private void siftDownUsingComparator(int k, E x) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = queue[child];
int right = child + 1;
if (right < size && comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];
if (comparator.compare(x, (E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = x;
}
3.DelayQueue的繼續體系
瞭解了DelayQueue的底層實際是通過PriorityQueue實現,再來看看DelayQueue的繼承關係,如下圖所示,父類及介面之前的學習中都已分析過,不在贅言。
4.Delay介面
DelayQueue佇列與其他佇列最明顯的不同之處,就是它的延時功能,也正因為這個延時特點,DelayQueue中的物件都必須要實現Delay介面,接下來就看看這個Delay介面是幹什麼的。
//用來標記那些應該在給定延遲時間之後執行的物件
public interface Delayed extends Comparable<Delayed> {
//檢查延遲是否結束,該方法返回一個延遲時間,時間到後在檢查還有沒有
//延遲,若沒有延遲執行下一步,若還有延遲,繼續等待
long getDelay(TimeUnit unit);
}
DelayQueue的使用示例:
/**
* 延遲佇列的使用示例
* 主執行緒建立三個延遲任務放到queue中,其他三個執行緒
* 在任務可用時取出
* Created by bzhang on 2019/4/1.
*/
public class TestDelayed implements Delayed {
private String name;
private Date takeTime; //延遲時間
public TestDelayed(String name, Date takeTime) {
this.name = name;
this.takeTime = takeTime;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Date getTakeTime() {
return takeTime;
}
public void setTakeTime(Date takeTime) {
this.takeTime = takeTime;
}
@Override
public long getDelay(TimeUnit unit) {
long convert = unit.convert(takeTime.getTime()-System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return convert;
}
@Override
public int compareTo(Delayed o) {
TestDelayed t = (TestDelayed)o;
long l = this.takeTime.getTime() - t.getTakeTime().getTime();
if (l==0){
return 0;
}
return l > 0 ? 1 : -1;
}
@Override
public String toString() {
return "TestDelayed{" +
"name='" + name + '\'' +
", takeTime=" + takeTime +
'}';
}
public static void main(String[] args) {
DelayQueue queue = new DelayQueue();
long l = System.currentTimeMillis();
queue.put(new TestDelayed("A",new Date(l+5000)));
queue.put(new TestDelayed("B",new Date(l+2000)));
queue.put(new TestDelayed("C",new Date(l+7000)));
System.out.println(new Date());
int t = 0;
for (int i = 0;i < 3;i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
//結果
Tue Apr 02 11:03:33 CST 2019
Thread-1TestDelayed{name='B', takeTime=Tue Apr 02 11:03:35 CST 2019}
Thread-0TestDelayed{name='A', takeTime=Tue Apr 02 11:03:38 CST 2019}
Thread-2TestDelayed{name='C', takeTime=Tue Apr 02 11:03:40 CST 2019}
5.DelayQueue中的重要屬性及構造方法
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
//重入鎖,用於保證併發安全
private final transient ReentrantLock lock = new ReentrantLock();
//底層優先順序佇列,實際元素都儲存與該佇列中,底層是陣列構成的二叉堆
private final PriorityQueue<E> q = new PriorityQueue<E>();
//下一個等待獲取元素的執行緒,可減少不必要的等待
private Thread leader = null;
//條件控制,表示是否可以從佇列中取資料
private final Condition available = lock.newCondition();
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
}
6.DelayQueue的入隊方法
//add方法本質就是呼叫offer方法,將元素新增到佇列
public boolean add(E e) {
return offer(e);
}
//同上
public void put(E e) {
offer(e);
}
//延遲佇列是無界佇列,指定超時時間放入元素沒有意義,與直接入隊是一樣的
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
//向佇列中新增元素,元素位置以比較結果(compareTo方法)來確定
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); //呼叫底層優先順序佇列的offer方法來儲存元素
//判斷底層優先順序佇列的隊首是否是新增元素
if (q.peek() == e) {
leader = null;
//喚醒條件等待佇列的某一個執行緒,即說明佇列中有元素了,
//可以從佇列中獲取到元素了
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
7.DelayQueue的出隊方法
//返回延遲時間已到的第一個元素,或返回null(沒有元素或元素延遲時間都未到)
public E poll() {
final ReentrantLock lock = this.lock; //重入鎖
lock.lock(); //加鎖同步
try {
E first = q.peek(); //獲取優先順序佇列中的隊首元素
//判斷佇列是否為空,若不為空那麼隊首延遲時間是否到達,若都不滿足
//說明隊首元素可用,返回隊首
//否則返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
//若有延遲時間已到的元素就立即返回,若無則一直等待
//佇列中無元素那麼也一直等待
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //可被中斷鎖
try {
for (;;) { //自旋
E first = q.peek(); //獲取隊首元素
//若佇列為空,直接進入條件佇列等待喚醒
//佇列不為空,則判斷隊首的延時是否到達
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS); //獲取剩餘延遲時間(單位是ns)
if (delay <= 0) //沒有剩餘延遲時間,則將隊首元素返回
return q.poll();
first = null;
//判斷是否已經有其他執行緒在等待取元素
//若有,那麼就讓當前執行緒直接等待
//若沒有,那就說明當前只有本執行緒在等待獲取隊首元素
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread(); //獲取當前執行緒
leader = thisThread; //將單籤執行緒設為等待獲取隊首的執行緒
try {
//等待隊首元素的延遲時間後,在嘗試獲取隊首元素
available.awaitNanos(delay);
} finally {
//將等待獲取的執行緒設為null,因為當前執行緒正在獲取,因此不應該有leader
//即leader為null,說明要麼有執行緒正在執行獲取操作,要麼沒有出隊操作在進行
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//當前執行緒已經取完元素了,可以喚醒其他執行緒獲取隊首元素了
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
//指定時間內獲取延遲的隊首元素,若在指定等待時間內隊首延遲時間未到達或佇列為空
//就返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
//佇列是否為空,若為空佇列,那麼在指定等待是否到達,若等待時間也已到達
//那就返回null,若未到達等待時間,就繼續等待
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos); //當前執行緒進入等待時間nanos納秒
} else {
long delay = first.getDelay(NANOSECONDS); //獲取隊首元素的延遲時間
//判斷延遲時間是否到達,到達就直接將隊首元素返回
if (delay <= 0)
return q.poll();
//延遲時間未到,但等待時間已經達到,那麼就返回null
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
//延遲時間小於等待時間,說明可以在等待時間內獲取到隊首元素
//那麼就在等待延遲時間到達的時間內,可以再次嘗試將隊首元素獲取返回
//這裡僅是再次嘗試,因為可能在等待期間內有新的元素入隊,且延遲時間最小成為新隊首
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
//等待時間 > 延遲時間 並且沒有其它執行緒在等待,
//那麼當前元素成為leader,表示當前執行緒最早正在等待獲取元素
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//讓等待時間到達
long timeLeft = available.awaitNanos(delay);
//繼續等待的時間
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
8.peek方法
//peek方法僅僅就是為底層的優先順序佇列的peek方法加上鎖
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
二、PriorityBlockingQueue併發容器
1.PriorityBlockingQueue的底層實現
PriorityBlockingQueue是一個執行緒安全的無界阻塞佇列,可以看對是PriorityQueue的多執行緒版本,其底層資料結構與PriorityQueue相同,都是陣列實現的利用二叉堆結構。前文已經分析過,這裡不再多說
2.PriorityBlockingQueue的繼承體系
PriorityBlockingQueue的繼承關係如下圖所示,均是之前學習過的父類或介面。這裡不再展開。
3.PriorityBlockingQueue中的重要屬性及構造方法
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//未指定佇列初始容量時使用的預設容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//佇列雖然說是無界的,但實際佇列是不能超過Integer.MAX_VALUE - 8這個值的
//若是超過報OOM異常
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//底層存放資料的陣列
private transient Object[] queue;
//佇列中元素的個數,計數器
private transient int size;
//用於判斷優先順序的比較器,若為null則使用自然排序
private transient Comparator<? super E> comparator;
//重入鎖,保證併發安全
private final ReentrantLock lock;
//佇列非空條件,用於出隊操作
private final Condition notEmpty;
//用於佇列顯示是否處於擴容狀態,0表示沒有在擴容
//而1表示處於擴容狀態,將該值更新成1的執行緒會進行陣列擴容
//其他要進行擴容的執行緒檢查該值發現為1,則直接暫停執行緒讓出CPU
private transient volatile int allocationSpinLock;
//將佇列轉換成執行緒不安全的優先順序佇列,用於序列化
private PriorityQueue<E> q;
//建立一個預設初始容量的佇列
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
//建立一個指定初始容量的佇列
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
//建立一個指定容量和比較器的佇列
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
//以集合c為底,建立一個佇列
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
//根據集合c是哪一種容器來決定建立怎樣的初始佇列
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify();
}
}
4.入隊方法
//PriorityBlockingQueue所有的入隊方法,都一樣,因為佇列是無界佇列
//不存在加入佇列失敗的可能,因此最終都是呼叫offer方法
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
public boolean offer(E e) {
//優先順序佇列中不允許存在null元素,因此null元素無法確定優先順序
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap; //n為當前佇列中的元素個數,cap為當前佇列的容量
Object[] array;
//判斷底層陣列是否需要擴容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
//判斷是使用比較器進行上浮操作,還是使用自然排序進行上浮操作
if (cmp == null)
siftUpComparable(n, e, array); //自然排序上浮
else
siftUpUsingComparator(n, e, array, cmp); //比較器上浮
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
//自然上浮,與PriorityQueue中一樣
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1; //獲取父結點索引
Object e = array[parent]; //父結點
//比較插入的值與父結點的大小,若比父結點小,那麼交換位置後,在繼續比較
//若比父結點大,說明排序正確,無需在繼續比較
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
//比較器比較上浮
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
//陣列擴容
private void tryGrow(Object[] array, int oldCap) {
// 擴容時不需要加鎖,因為擴容是通過CAS方式來實現的,
//這樣不僅可以提升效率,並且不影響出隊操作
lock.unlock();
Object[] newArray = null;
//將allocationSpinLock更新成1的執行緒進行陣列擴容操作,其餘要擴容的執行緒暫停
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//擴容規則,容量小於64,擴大2倍+2,容量不小於64,則擴大1.5倍
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
//判斷擴大後的容量是否越界
//若是會越界,則擴容規則改為舊容量+1,若仍越界,報OOM異常
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap]; //建立新陣列
} finally {
allocationSpinLock = 0; //恢復為0,表示沒有在擴容狀態
}
}
if (newArray == null) //未競爭到擴容操作的執行緒暫停
Thread.yield();
lock.lock(); /重新上鎖
if (newArray != null && queue == array) {
queue = newArray;
//將舊陣列中的資料轉移到新陣列中
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
5.出隊方法
//獲取並移除隊首元素,若佇列為空,返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue(); //真正出隊的方法
} finally {
lock.unlock();
}
}
//真正執行獲取並移除隊首元素的方法
private E dequeue() {
int n = size - 1; //移除隊首後佇列中的元素個數 ,同時也是隊尾元素的索引
//判斷佇列是否為空佇列,空佇列直接返回null
if (n < 0)
return null;
else {
Object[] array = queue; //獲取底層陣列引用
E result = (E) array[0]; //獲取隊首元素
E x = (E) array[n]; //獲取隊尾元素
array[n] = null; //隊尾置為null
Comparator<? super E> cmp = comparator;
//將原來佇列的隊尾放到隊首位置,然後進行下沉操作(即二叉堆重新排序的操作)
if (cmp == null)
siftDownComparable(0, x, array, n); //使用自然排序下沉
else
siftDownUsingComparator(0, x, array, n, cmp); //使用比較器下沉
size = n;
return result;
}
}
//下沉操作與PriorityQueue中相同,這裡不在多做分析
//自然排序下沉
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
//比較器下沉
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = array[child];
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
if (cmp.compare(x, (T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}
//獲取並移除隊首元素,若佇列已空,則等待
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//若返回的元素為null,說明佇列中沒有元素
//那麼讓當前執行緒進入條件佇列中等待,當前佇列有元素時,則
//會喚醒執行緒,在嘗試獲取並移除隊首
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
//在一定時間內嘗試獲取並移除隊首元素,若在指定時間內未成功,
//返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//嘗試獲取並移除隊首,若失敗但超時時間未到,則進入條件等待
//一段時間後在進行嘗試,若超時時間已過仍為成功獲取並移除隊首
//則返回null
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
//獲取但不移除隊首元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
6.總結
1.PriorityBlocking
Queue是基於陣列實現的二叉堆結構。
2.PriorityBlocking
Queue中涉及到元素之間的比較,因此不能存在null元素。
3.PriorityBlocking
Queue的入隊出隊操作執行緒安全是通過重入鎖ReentrantLock實現的,但在擴容時是基於CAS演算法實現的。
4.PriorityBlocking
Queue是無界佇列,其入隊出隊規則是基於優先順序的,雖然說是無界佇列,但並不是無限大的,容量不能超過
Integer.MAX_VALUE - 8。