Java常見集合框架(十六):Queue之DelayQueue、PriorityQueue、PriorityBlockingQueue
DelayQueue
public class DelayQueue extends AbstractQueue implements BlockingQueue
- Delayed 元素的一個基於優先順序的無界阻塞佇列,只有在延遲期滿時才能從中提取元素。
- 如果延遲都還沒有期滿,則佇列沒有頭部,並且 poll 將返回 null。
- 不允許使用 null 元素。
成員變數
/**
* 可重入的互斥鎖
*/
private final transient ReentrantLock lock = new ReentrantLock();
/**
* 一個基於優先順序堆的無界優先順序佇列。可自然排序。不允許使用 null 元素。
*/
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* 喚醒或等待take執行緒的條件
*/
private final Condition available = lock.newCondition();
構造方法
/**
* 建立一個最初為空的新 DelayQueue。
*/
public DelayQueue() {}
/**
* 建立一個最初包含 Delayed 例項的給定 collection 元素的 DelayQueue。
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
常用方法
boolean add(E e):將指定元素插入此延遲佇列中。
public boolean add(E e) {
return offer(e);
}
boolean offer(E e):將指定元素插入此延遲佇列。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式獲取鎖
try {
E first = q.peek();//獲取但不移除此佇列的頭部;如果此佇列為空,則返回 null。
q.offer(e);//將指定的元素插入此優先順序佇列。
if (first == null || e.compareTo(first) < 0)
available.signalAll();//佇列中無元素,喚醒take執行緒
return true;
} finally {
lock.unlock();//釋放鎖
}
}
E peek():獲取但不移除此佇列的頭部;如果此佇列為空,則返回 null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式獲取鎖
try {
return q.peek();// 獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。
} finally {
lock.unlock();//釋放鎖
}
}
E poll(): 獲取並移除此佇列的頭,如果此佇列不包含具有已到期延遲時間的元素,則返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();// 獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。
//getDelay:返回與此物件相關的剩餘延遲時間,以給定的時間單位表示。
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;//佇列為空或元素未到期
else {
E x = q.poll();//獲取並移除此佇列的頭,如果此佇列為空,則返回 null。
assert x != null;//非空校驗
if (q.size() != 0)//佇列中還有元素
available.signalAll();//喚醒take執行緒
return x;
}
} finally {
lock.unlock();
}
}
void put(E e):將指定元素插入此延遲佇列。
public void put(E e) {
offer(e);
}
E take():獲取並移除此佇列的頭部,在可從此佇列獲得到期延遲的元素之前一直等待(如有必要)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞式獲取鎖,可響應執行緒中斷
try {
for (;;) {
E first = q.peek();//獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。
if (first == null) {
available.await();//佇列為空,執行緒等待,釋放鎖
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);//獲取元素剩餘到期市場,並判斷是否到期
if (delay > 0) {
long tl = available.awaitNanos(delay);//元素還未到期,執行緒等待指定時間,釋放鎖
} else {
E x = q.poll();//獲取並移除此佇列的頭,如果此佇列為空,則返回 null。
assert x != null;//非空校驗
if (q.size() != 0)
available.signalAll(); // 執行緒非空,喚醒其它所有take執行緒 return x;//返回隊頭元素
}
}
}
} finally {
lock.unlock();//釋放鎖
}
}
由原始碼看出延遲佇列DelayQueue操作元素是通過PriorityQueue實現的,PriorityQueue是一個基於優先順序堆的無界優先順序佇列。利用可重入的互斥鎖ReentrantLock保證執行緒安全,同時利用Condition保證插入或獲取元素是阻塞的。
PriorityQueue
public class PriorityQueue extends AbstractQueue implements java.io.Serializable
- 一個基於優先順序堆的無界優先順序佇列。
- 優先順序佇列不允許使用 null 元素。
- 預設容量11,當元素容量小於64時,擴容double,否則擴容50%。
- 不是同步的。
成員變數
/**
* 初始容量
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
* 元素以平衡二叉樹形式儲存
*/
private transient Object[] queue;
/**
* 優先順序佇列元素數
*/
private int size = 0;
/**
* 元素自然排序方式
*/
private final Comparator<? super E> comparator;
/**
* 優先順序佇列修改次數
*/
private transient int modCount = 0;
構造方法
/**
* 使用預設的初始容量(11),並根據其自然順序對元素進行排序。
*/
public PriorityQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
* 使用指定的初始容量建立Queue,並根據指定的比較器對元素進行排序。
*/
public PriorityQueue(int initialCapacity,
Comparator<? super E> comparator) {
// Note: This restriction of at least one is not actually needed,
// but continues for 1.5 compatibility
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.queue = new Object[initialCapacity];
this.comparator = comparator;
}
常用方法
boolean add(E e):將指定的元素插入此優先順序佇列。
public boolean add(E e) {
return offer(e);
}
boolean offer(E e): 將指定的元素插入此優先順序佇列。
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;//修改次數+1
int i = size;
if (i >= queue.length)
grow(i + 1);//元素數可能不夠,需要擴容
size = i + 1;
if (i == 0)//佇列為空
queue[0] = e;
else
siftUp(i, e);
return true;
}
/**
* 佇列擴容
*/
private void grow(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
int oldCapacity = queue.length;//獲取當前元素數
// <64 雙倍擴容; >=64 擴容 50%
int newCapacity = ((oldCapacity < 64)?
((oldCapacity + 1) * 2):
((oldCapacity / 2) * 3));
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;//最大邊界
if (newCapacity < minCapacity)
newCapacity = minCapacity;
queue = Arrays.copyOf(queue, newCapacity);//陣列複製
}
//選擇排序方式並插入相應位置
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
/**
* Comparable 方式排序
*/
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
/**
* Comparator 方式排序
*/
private void siftUpUsingComparator(int k, E x) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (comparator.compare(x, (E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = x;
}
E peek(): 獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。
public E peek() {
if (size == 0)
return null;
return (E) queue[0];
}
E poll() :獲取並移除此佇列的頭,如果此佇列為空,則返回 null。
public E poll() {
if (size == 0)
return null;
int s = --size;//更新元素數
modCount++;//修改次數+1
E result = (E) queue[0];//獲取佇列頭部元素
E x = (E) queue[s];//獲取尾部元素
queue[s] = null;//末尾置空
if (s != 0)//佇列中還有元素
siftDown(0, x);
return result;
}
/**
* 元素重新排序
*/
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
/**
* Comparable方式重新排序
*/
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
/**
* Comparator方式重新排序
*/
private void siftDownUsingComparator(int k, E x) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = queue[child];
int right = child + 1;
if (right < size &&
comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];
if (comparator.compare(x, (E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = x;
}
由原始碼看出,PriorityQueue是非執行緒安全的,利用Comparator或Comparable進行自然排序,從而實現有優先順序的佇列,以平衡二叉樹的形式儲存在transient Object[] queue中,雖然api中介紹說是無界佇列,但從原始碼看出其實是有邊界的 ,值為Integer.MAX_VALUE;只是邊界特別大,從某種程度上來說,可以理解為無邊界。
PriorityBlockingQueue
public class PriorityBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable
- 一個無界阻塞佇列,它使用與類 PriorityQueue 相同的順序規則,並且提供了阻塞獲取操作。
- 此佇列邏輯上是無界的,但是資源被耗盡時試圖執行 add 操作也將失敗(導致 OutOfMemoryError)。
- 不允許使用 null 元素。
- 不允許插入不可比較的物件。
成員變數
//元素操作均基於PriorityQueue
private final PriorityQueue<E> q;
//可重入的互斥鎖,該處採用公平鎖
private final ReentrantLock lock = new ReentrantLock(true);
//take條件
private final Condition notEmpty = lock.newCondition();
構造方法
/**
* 用預設的初始容量 (11) 建立一個 PriorityBlockingQueue,並根據元素的自然順序對其元素進行排序。
*/
public PriorityBlockingQueue() {
q = new PriorityQueue<E>();
}
/**
* 使用指定的初始容量建立一個 PriorityBlockingQueue,並根據指定的比較器對其元素進行排序。
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
q = new PriorityQueue<E>(initialCapacity, comparator);
}
/**
* 使用指定的初始容量建立一個 PriorityBlockingQueue,並根據元素的自然順序對其元素進行排序。
*/
public PriorityBlockingQueue(int initialCapacity) {
q = new PriorityQueue<E>(initialCapacity, null);
}
/**
* 建立一個包含指定 collection 元素的 PriorityBlockingQueue。
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
q = new PriorityQueue<E>(c);
}
常用方法
boolean add(E e):將指定元素插入此優先順序佇列。
public boolean add(E e) {
return offer(e);
}
boolean offer(E e):將指定元素插入此優先順序佇列。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式獲取鎖
try {
boolean ok = q.offer(e);//插入元素
assert ok;
notEmpty.signal();//喚醒take執行緒
return true;
} finally {
lock.unlock();//釋放鎖
}
}
E peek():獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式獲取鎖
try {
return q.peek();//獲取元素
} finally {
lock.unlock();//釋放鎖
}
}
E poll():獲取並移除此佇列的頭,如果此佇列為空,則返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式獲取鎖
try {
return q.poll();//獲取並移除此佇列的頭
} finally {
lock.unlock();//釋放鎖
}
}
E take():獲取並移除此佇列的頭部,在元素變得可用之前一直等待(如果有必要)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞式獲取鎖,可響應執行緒中斷
try {
try {
while (q.size() == 0)
notEmpty.await();//佇列中無元素,阻塞等待,釋放鎖
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = q.poll();//有元素,獲取並移除元素
assert x != null;
return x;//返回元素
} finally {
lock.unlock();//釋放鎖
}
}
void put(E e):將指定元素插入此優先順序佇列。
public void put(E e) {
offer(e); // never need to block
}
有原始碼看出,PriorityBlockingQueue是基於PriorityQueue實現具有優先順序的無界阻塞佇列,利用ReentrantLock實現執行緒安全,Condition實現阻塞。
DelayQueue及PriorityBlockingQueue實現具有優先順序的無界阻塞佇列都是基於PriorityQueue的,區別在於DelayQueue加入了延遲概念。雖說都是無界,但最大邊界為:Integer.MAX_VALUE。