併發佇列之PriorityBlockingQueue
這一篇說一下PriorityBlockingQueue,引用書中的一句話:這就是帶優先順序的無界阻塞佇列,每次出隊都返回優先順序最高或者最低的元素(這裡規則可以自己制定),內部是使用平衡二叉樹實現的,遍歷不保證有序;
其實也比較容易,就是基於陣列實現的一個平衡二叉樹,不瞭解平衡二叉樹的可以先了解一下,別想的太難,原理跟連結串列差不多,只不過連結串列中指向下一個節點的只有一個,而平衡二叉樹中有兩個,一個左,一個右,還有左邊的節點的值小於當前節點的值,右邊節點的值大於當前節點的值;看看平衡二叉樹的增刪改查即可;
一.認識PriorityBlockingQueue
底層是以陣列實現的,我們看看幾個重要的屬性:
//佇列預設初始化容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; //陣列最大容量 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //底層實現還是陣列 private transient Object[] queue; //佇列容量 private transient int size; //一個比較器,比較元素大小 private transient Comparator<? super E> comparator; //一個獨佔鎖,控制同時只有一個執行緒在入隊和出隊 private final ReentrantLock lock; //如果佇列是空的,還有執行緒來佇列取資料,就阻塞 //這裡只有一個條件變數,因為這個佇列是無界的,向佇列中插入資料的話就用CAS操作就行了 private final Condition notEmpty; //一個自旋鎖,CAS使得同時只有一個執行緒可以進行擴容,0表示沒有進行擴容,1表示正在進行擴容 private transient volatile int allocationSpinLock;
簡單看看構造器:
//預設陣列大小是11 public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } //可以指定陣列大小 public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } //初始化陣列、鎖、條件變數還有比較器 public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } //這個構造器也可以傳入一個集合 public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) heapify(); }
有興趣的可以看看下面這個圖,說的更詳細,個人覺得看重要的地方就行了;
二.offer方法
在佇列中插入一個元素,由於是無界佇列,所以一直返回true;
public boolean offer(E e) { //如果傳入的是null,就拋異常 if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; //獲取鎖 lock.lock(); int n, cap; Object[] array; //[1]當前陣列中實際資料總數>=陣列容量,就進行擴容 while ((n = size) >= (cap = (array = queue).length)) //擴容 tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; //[2]預設比較器為空時 if (cmp == null) siftUpComparable(n, e, array); else //[3]預設比較器不為空就用我們傳進去的預設比較器 siftUpUsingComparator(n, e, array, cmp); //陣列實際數量加一 size = n + 1; //喚醒notEmpty條件佇列中的執行緒 notEmpty.signal(); } finally { //釋放鎖 lock.unlock(); } return true; }
上面的程式碼中,我們就關注那三個地方就行了,首先是[1]中擴容:
private void tryGrow(Object[] array, int oldCap) { //首先釋放獲取的鎖,這裡不釋放也行,只是擴容有的時候很慢,需要花時間,此時入隊和出隊操作就不能進行了,極大地降低了併發性 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; //自旋鎖為0表示佇列此時沒有進行擴容,然後用CAS將自旋鎖從0該為1 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { //用這個演算法確定擴容後的陣列容量,可以看到如果當前陣列容量小於64,新陣列容量就是2n+2,大於64,新的容量就是3n/2 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); //判斷新的陣列容量是不是超過了最大容量,如果超過了,就嘗試在老的陣列容量加一,如果還是大於最大容量,就拋異常了 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { //擴容完畢就將自旋鎖變為0 allocationSpinLock = 0; } } //第一個執行緒在上面的if中執行CAS成功之後,第二個執行緒就會到這裡,然後執行yield方法讓出CPU,儘量讓第一個執行緒執行完畢; if (newArray == null) // back off if another thread is allocating Thread.yield(); //前面釋放鎖了,這裡要獲取鎖 lock.lock(); //將原來的陣列中的元素複製到新陣列中 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
再看[2]中的預設的比較器:
//這裡k表示陣列中實際數量,x表示要插入到陣列中的資料,array表示存放資料的陣列 private static <T> void siftUpComparable(int k, T x, Object[] array) { //由此可知,我們要放進陣列中的資料型別,必須要是實現了Comparable介面的 Comparable<? super T> key = (Comparable<? super T>) x; //這裡判斷陣列中有沒有資料,第一次插入資料的時候,k=0,不滿足這個迴圈條件,那就直接走最下面設定array[0] = key //滿足這個條件的話,首先獲取父節點的索引,然後取出值,再比較該值和需要插入值的大小,決定是跳出迴圈還是繼續迴圈 //這裡比較重要,這個迴圈就是不斷的調整二叉樹平衡的,下面我們畫圖看看 while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; }
隨便舉個例子看看怎麼把平衡二叉樹中的元素放到陣列中,節點中的資料型別就以Integer了,其實就是將每一層從做到右一次放到陣列中存起來,很明顯,在陣列中不是從小到大的順序的;
這裡注意一點,平衡二叉樹的存放順序不是唯一的,有很多種情況,跟你的存放順序有關!
所以我們看看siftUpComparable方法中的while迴圈是怎麼進行的?假設第一次呼叫offer(3),也就是呼叫siftUpComparable(0,3,array),這裡假設array有足夠的大小,不考慮擴容,那麼第一次會走到while迴圈後面執行array[0]=3,下圖所示:
第二次呼叫offer(1),也就是呼叫siftUpComparable(1,1,array),k=1,parent=0,所以父節點此時應該是3,然後1<3,不滿足if語句,設定array[1]=3,k=0,然後繼續迴圈不滿足條件,執行array[0]=1,下圖所示:
第三次呼叫offer(7),也就是呼叫siftUpComparable(2,7,array),k=2,parent=0,父節點為索引0的位置也就是1,因為7>1滿足if語句,所以break跳出迴圈,執行array[2]=7,下圖所示:
第四次呼叫offer(2),也就是呼叫siftUpComparable(3,2,array),k=3,parent=(k-1)>>>1=1,所以父節點表示索引為1的位置,也就是3,因為2<3,不滿足if語句,所以設定array[3]=3,k=1,再進行一次迴圈,parent=0,此時父節點的值是1,2<3,不滿足if,設定array[1]=1,k=0;再繼續迴圈不滿足迴圈條件,跳出迴圈,設定array[0] = 2
還是很容易的,有興趣的話再多試試新增幾個節點啊!其實還有[3]中使用我們自定義的比較器進行比較,其實i和上面程式碼一樣的,另外put方法就是呼叫的offer方法,這裡就不多說了
三.poll方法
poll方法的作用是獲取並刪除佇列內部二叉樹的根節點,如果佇列為空,就返回nul;
public E poll() { final ReentrantLock lock = this.lock; //獲取獨佔鎖,說明此時不能有其他執行緒進行入隊和出隊操作,但是可以進行擴容 lock.lock(); try { //獲取並刪除根節點,方法如下 return dequeue(); } finally { //釋放獨佔鎖 lock.unlock(); } } //這個方法可以好好看看,很有意思 private E dequeue() { int n = size - 1; //如果佇列為空,就返回null if (n < 0) return null; else { //否則就先取到陣列 Object[] array = queue; //取到第0個元素,這個也就是要返回的根節點 E result = (E) array[0]; //獲取佇列實際數量的最後一個元素,並把該位置賦值為null E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) //預設的比較器,這裡是真正的移除根節點,然後調整在整個平衡二叉樹,使得達到平衡 siftDownComparable(0, x, array, n); else //我們傳入的自定義比較器 siftDownUsingComparator(0, x, array, n, cmp); //然後數量減一 size = n; //返回根節點 return result; } } private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; //[1] int half = n >>> 1; // loop while a non-leaf //[2] while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; //[3] if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; //[4] if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } }
所以我們主要的是看看siftDownComparable方法中是怎麼將一個去掉了根節點的平衡二叉樹調整平衡的;比如現在有如下所示的平衡二叉樹:
呼叫poll方法,先是把最後一個元素儲存起來x=3,然後將最後一個位置設定為null,此時實際呼叫的是siftDownComparable(0,3,array,3),key=3,half=1,k=0,n=3,滿足[2],於是child=1,c=1,right=2,不滿足[3],不滿足[4],設定array[0]=1,k=1;繼續迴圈,不滿足迴圈條件,跳出迴圈,直接設定array[1]=3,最後poll方法返回的時2,下圖所示:
其實可以簡單的說說,最開始將陣列中最後一個值X儲存起來在適當時機插入到二叉樹中,什麼時候是適當時機呢?首先去掉根節點之後,得到根節點左子節點和右子節點的值leftVal和rightVal,如果X比leftVal小,那就直接把X放入到根節點的位置,整個平衡二叉樹就平衡了!如果X比leftVal大,那就將leftVal的值設定到根節點中,再以左子節點做遞迴,繼續比較X和左子節點的左節點的大小!仔細看看也沒啥。
四.take方法
這個方法作用是獲取二叉樹中的根節點,也就是陣列的第一個節點,佇列為空,就阻塞;
public E take() throws InterruptedException { //獲取鎖,可中斷 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { //如果二叉樹為空了,那麼dequeue方法就會返回null,然後這裡就會阻塞 while ( (result = dequeue()) == null) notEmpty.await(); } finally { //釋放鎖 lock.unlock(); } return result; } //這個方法前面說過,就是刪除根節點,然後調整平衡二叉樹 private E dequeue() { int n = size - 1; if (n < 0) return null; else { Object[] array = queue; E result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
五.一個簡單的例子
前面看了這個多方法,那就說說怎麼使用吧,看看PriorityBlockingQueue這個阻塞佇列怎麼使用;
package com.example.demo.study; import java.util.Random; import java.util.concurrent.PriorityBlockingQueue; import lombok.Data; public class Study0208 { @Data static class MyTask implements Comparable<MyTask>{ private int priority=0; private String taskName; @Override public int compareTo(MyTask o) { if (this.priority>o.getPriority()) { return 1; } return -1; } } public static void main(String[] args) { PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<MyTask>(); Random random = new Random(); //往佇列中放是個任務,從TaskName是按照順序放進去的,優先順序是隨機的 for (int i = 1; i < 11; i++) { MyTask task = new MyTask(); task.setPriority(random.nextInt(10)); task.setTaskName("taskName"+i); queue.offer(task); } //從佇列中取出任務,這裡是按照優先順序去拿出來的,相當於是根據優先順序做了一個排序 while(!queue.isEmpty()) { MyTask pollTask = queue.poll(); System.out.println(pollTask.toString()); } } }
&n