JAVA併發程式設計: PriorityQueue -》阻塞佇列 PriorityBlockingQueue
生活
一旦一種新技術開始滾動碾壓道路,如果你不能成為壓路機的一部分,那麼你就只能成為道路的一部分。
PriorityQueue
阻塞佇列裡的PriorityBlockingQueue基於PriorityQueue,所以在研究PriorityBlockingQueue之前要先研究一下PriorityQueue,這是一個有優先順序概念的佇列,是有順序的,他的順序是通過內部的比較器實現。
他的內部維護了一個數組,但不是一個簡單的陣列,其實是通過一個數組,維護一個二叉堆的資料結構。
因此先把二叉堆搞清楚。
啥是二叉堆
研究二叉堆,有必要先把堆的概念搞清楚,注意:這裡指代的堆,並不是java開發中常說的堆疊的那個堆哦。
那麼,堆是一種怎麼樣的資料結構呢?
堆通常是一個可以看成一棵樹的陣列物件,有以下兩個特徵:
1、堆的某個節點,總是不大於或者不小於父節點
2、堆是一顆完全樹。
二叉堆是一種特殊的堆,二叉樹是一顆完全二叉樹或者近似於完全二叉樹。
二叉堆又有兩種,最大堆和最小堆。
最大堆:父節點總是大於等於任何一個子節點
最小堆:父節點總是小於等於任何一個子節點。
來圖解表示下二叉堆
上圖是一個二叉堆(完全二叉樹),他的特點是在N層被填滿之前,不會開始第N+1層的填充,並且填充的順序是從左往右。
二叉堆又可以用陣列來表示:
如下圖
通過上圖可以發現一個規律,使用陣列實現的二叉堆,位置N上的元素,其左孩子在2N+1處,其右孩子在2N+2處,根節點是0。
由於ProtityQueue是一個有優先順序概念的佇列,因此可以使用二叉堆來實現,佇列的入隊和出隊,也可以通過二叉堆來實現,對應到二叉堆就是他的上移和下移,下面來圖解一下 二叉堆的上移和下移。
這些圖都是盜來的。。自己繪畫水平太差了~~
二叉堆的上移
下面圖解描述一下如何向二叉堆新增一個元素:
圖1是一個二叉堆 最小堆(完全二叉樹)
圖2在二叉樹的最後插入一個節點2
圖3 由於圖2中2的父節點6比它大,所以2和6交換
圖4 由於2的父節點5比它,所以2和5交換,
此時又是一個標準的二叉堆。
二叉堆的下移
下面來看下如何把二叉堆中第一個元素移出,即優先佇列中的出隊操作。
圖1一個二叉堆
圖2準備出隊最小元素1,先把最後一個元素8所在的位置刪除
圖3 最小節點1下兩個孩子節點,取最小節點3交換
圖4 最小節點1下兩個孩子節點,去最小節點4交換
此時根元素1的最小孩子節點9比8還大,直接用8覆蓋1。1即被刪除了。
PriorityQueue的建立
//容器陣列
transient Object[] queue;
//構造器很多,總的來說指定容量和比較器,如果沒有指定比較器,要求存入的元素必須實現了Compareable介面,如果沒有指定容量預設11
public PriorityQueue() {
//預設11
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityQueue(Comparator<? super E> comparator) {
this(DEFAULT_INITIAL_CAPACITY, comparator);
}
public PriorityQueue(int initialCapacity,
Comparator<? super E> comparator) {
// Note: This restriction of at least one is not actually needed,
// but continues for 1.5 compatibility
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.queue = new Object[initialCapacity];
this.comparator = comparator;
}
@SuppressWarnings("unchecked")
public PriorityQueue(Collection<? extends E> c) {
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
initElementsFromCollection(ss);
}
else if (c instanceof PriorityQueue<?>) {
PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
initFromPriorityQueue(pq);
}
else {
this.comparator = null;
initFromCollection(c);
}
}
@SuppressWarnings("unchecked")
public PriorityQueue(PriorityQueue<? extends E> c) {
this.comparator = (Comparator<? super E>) c.comparator();
initFromPriorityQueue(c);
}
@SuppressWarnings("unchecked")
public PriorityQueue(SortedSet<? extends E> c) {
this.comparator = (Comparator<? super E>) c.comparator();
initElementsFromCollection(c);
}
PriorityQueue的入隊
public boolean offer(E e) {
//元素為空就報錯
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
//容量不夠就擴容
if (i >= queue.length)
//擴容
grow(i + 1);
size = i + 1;
//當是第一個元素時,就不用去比較只需要放在根節點即可
if (i == 0)
queue[0] = e;
else
//上移
siftUp(i, e);
return true;
}
//擴容
private void grow(int minCapacity) {
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
//如果原長度小於64,則設定新容量 原來+原來的+2
//否則 原來+原來/2
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
// overflow-conscious code
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
queue = Arrays.copyOf(queue, newCapacity);
}
//上移程式碼
//根據是否有比較器選擇不一樣的方法
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
// 比較自己與父節點,比較不滿足條件就互換
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
@SuppressWarnings("unchecked")
private void siftUpUsingComparator(int k, E x) {
//同上
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (comparator.compare(x, (E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = x;
}
PriorityQueue的出隊
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
//取出第一個元素return出去
E result = (E) queue[0];
//取出最後一個元素
E x = (E) queue[s];
//把最後一個元素的位置置空
queue[s] = null;
if (s != 0)
siftDown(0, x);
return result;
}
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
//之所以迴圈一半,是因為超過一半的 2n+1就超過這個陣列長度,也就是沒有子節點了。
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
//比較兩個孩子節點,取最小或者最大的交換
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
//把最後一個元素放置在對應的位置上
queue[k] = key;
}
@SuppressWarnings("unchecked")
private void siftDownUsingComparator(int k, E x) {
//同上
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = queue[child];
int right = child + 1;
if (right < size &&
comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];
if (comparator.compare(x, (E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = x;
}
PriorityQueue的heapify
回到上面建立優先佇列的構造器中,有一些是直接通過一個集合來建立,這裡的方法是先把集合裡的元素轉成一個數組,然後通過heapify方法把這個陣列變成一個標準的二叉堆,本質的實現原理就是下移:
private void heapify() {
for (int i = (size >>> 1) - 1; i >= 0; i--)
siftDown(i, (E) queue[i]);
}
PriorityBlockingQueue
PriorityBlockingQueue無非就死在PriorityQueue的基礎上加上鎖和條件變數
/**
* Lock used for all public operations
*/
private final ReentrantLock lock;
/**
* Condition for blocking when empty
*/
private final Condition notEmpty;
注意到條件變數僅有notEmpty,用以阻塞當佇列為空時的出隊執行緒。
為啥沒有notFull,因為本身有擴容的操作。所以不存在容量上限的情況。
案例程式碼
public class PBQTest {
public static void main(String[] args) {
PriorityBlockingQueue queue = new PriorityBlockingQueue();
new Thread(new PBQ(queue,new Student("小明",20))).start();
new Thread(new PBQ(queue,new Student("小紅",17))).start();
new Thread(new PBQ(queue,new Student("小剛",25))).start();
new Thread(new PBQ(queue,new Student("小慌",31))).start();
new Thread(new PBQ(queue,null)).start();
new Thread(new PBQ(queue,null)).start();
new Thread(new PBQ(queue,null)).start();
new Thread(new PBQ(queue,null)).start();
}
static class PBQ implements Runnable{
private PriorityBlockingQueue queue;
private Student student;
@Override
public void run() {
if(student == null){
try {
Thread.sleep(Long.valueOf(new Random().nextInt(5000)));
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else{
queue.offer(student);
}
}
public PBQ(PriorityBlockingQueue queue, Student student) {
this.queue = queue;
this.student = student;
}
}
static class Student implements Comparable<Student>{
@Override
public int compareTo(Student o) {
return this.age>o.age?1:0;
}
private String name;
private int age;
public Student(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return String.format("name:%s,age:%s",name,age);
}
}
}
name:小紅,age:17
name:小明,age:20
name:小剛,age:25
name:小慌,age:31