1. 程式人生 > >併發佇列之PriorityBlockingQueue

併發佇列之PriorityBlockingQueue

  這一篇說一下PriorityBlockingQueue,引用書中的一句話:這就是帶優先順序的無界阻塞佇列,每次出隊都返回優先順序最高或者最低的元素(這裡規則可以自己制定),內部是使用平衡二叉樹實現的,遍歷不保證有序;

  其實也比較容易,就是基於陣列實現的一個平衡二叉樹,不瞭解平衡二叉樹的可以先了解一下,別想的太難,原理跟連結串列差不多,只不過連結串列中指向下一個節點的只有一個,而平衡二叉樹中有兩個,一個左,一個右,還有左邊的節點的值小於當前節點的值,右邊節點的值大於當前節點的值;看看平衡二叉樹的增刪改查即可;

 

一.認識PriorityBlockingQueue

  底層是以陣列實現的,我們看看幾個重要的屬性:

//佇列預設初始化容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//陣列最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//底層實現還是陣列
private transient Object[] queue;
//佇列容量
private transient int size;
//一個比較器,比較元素大小
private transient Comparator<? super E> comparator;
//一個獨佔鎖,控制同時只有一個執行緒在入隊和出隊
private final ReentrantLock lock;
//如果佇列是空的,還有執行緒來佇列取資料,就阻塞
//這裡只有一個條件變數,因為這個佇列是無界的,向佇列中插入資料的話就用CAS操作就行了
private final Condition notEmpty;
//一個自旋鎖,CAS使得同時只有一個執行緒可以進行擴容,0表示沒有進行擴容,1表示正在進行擴容
private transient volatile int allocationSpinLock;

 

 

  簡單看看構造器:

//預設陣列大小是11
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
//可以指定陣列大小
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
//初始化陣列、鎖、條件變數還有比較器
public PriorityBlockingQueue(int initialCapacity,
                                Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}
//這個構造器也可以傳入一個集合
public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true; // true if not known to be in heap order
    boolean screen = true;  // true if must screen for nulls
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    }
    else if (c instanceof PriorityBlockingQueue<?>) {
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    Object[] a = c.toArray();
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    if (heapify)
        heapify();
}

 

  有興趣的可以看看下面這個圖,說的更詳細,個人覺得看重要的地方就行了;

 

二.offer方法

  在佇列中插入一個元素,由於是無界佇列,所以一直返回true;

public boolean offer(E e) {
    //如果傳入的是null,就拋異常
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    //獲取鎖
    lock.lock();
    int n, cap;
    Object[] array;
    //[1]當前陣列中實際資料總數>=陣列容量,就進行擴容
    while ((n = size) >= (cap = (array = queue).length))
        //擴容
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        //[2]預設比較器為空時
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
        //[3]預設比較器不為空就用我們傳進去的預設比較器
            siftUpUsingComparator(n, e, array, cmp);
        //陣列實際數量加一
        size = n + 1;
        //喚醒notEmpty條件佇列中的執行緒
        notEmpty.signal();
    } finally {
        //釋放鎖
        lock.unlock();
    }
    return true;
}

  

  上面的程式碼中,我們就關注那三個地方就行了,首先是[1]中擴容:

private void tryGrow(Object[] array, int oldCap) {
    //首先釋放獲取的鎖,這裡不釋放也行,只是擴容有的時候很慢,需要花時間,此時入隊和出隊操作就不能進行了,極大地降低了併發性
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    //自旋鎖為0表示佇列此時沒有進行擴容,然後用CAS將自旋鎖從0該為1
    if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
        try {
            //用這個演算法確定擴容後的陣列容量,可以看到如果當前陣列容量小於64,新陣列容量就是2n+2,大於64,新的容量就是3n/2
            int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));
            //判斷新的陣列容量是不是超過了最大容量,如果超過了,就嘗試在老的陣列容量加一,如果還是大於最大容量,就拋異常了
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            //擴容完畢就將自旋鎖變為0
            allocationSpinLock = 0;
        }
    }
    //第一個執行緒在上面的if中執行CAS成功之後,第二個執行緒就會到這裡,然後執行yield方法讓出CPU,儘量讓第一個執行緒執行完畢;
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    //前面釋放鎖了,這裡要獲取鎖
    lock.lock();
    //將原來的陣列中的元素複製到新陣列中
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

 

  再看[2]中的預設的比較器:

//這裡k表示陣列中實際數量,x表示要插入到陣列中的資料,array表示存放資料的陣列
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    //由此可知,我們要放進陣列中的資料型別,必須要是實現了Comparable介面的
    Comparable<? super T> key = (Comparable<? super T>) x;
    //這裡判斷陣列中有沒有資料,第一次插入資料的時候,k=0,不滿足這個迴圈條件,那就直接走最下面設定array[0] = key
    //滿足這個條件的話,首先獲取父節點的索引,然後取出值,再比較該值和需要插入值的大小,決定是跳出迴圈還是繼續迴圈
    //這裡比較重要,這個迴圈就是不斷的調整二叉樹平衡的,下面我們畫圖看看
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

 

  

  隨便舉個例子看看怎麼把平衡二叉樹中的元素放到陣列中,節點中的資料型別就以Integer了,其實就是將每一層從做到右一次放到陣列中存起來,很明顯,在陣列中不是從小到大的順序的;

  這裡注意一點,平衡二叉樹的存放順序不是唯一的,有很多種情況,跟你的存放順序有關!

   

  所以我們看看siftUpComparable方法中的while迴圈是怎麼進行的?假設第一次呼叫offer(3),也就是呼叫siftUpComparable(0,3,array),這裡假設array有足夠的大小,不考慮擴容,那麼第一次會走到while迴圈後面執行array[0]=3,下圖所示:

    

  第二次呼叫offer(1),也就是呼叫siftUpComparable(1,1,array),k=1,parent=0,所以父節點此時應該是3,然後1<3,不滿足if語句,設定array[1]=3,k=0,然後繼續迴圈不滿足條件,執行array[0]=1,下圖所示:

  

   第三次呼叫offer(7),也就是呼叫siftUpComparable(2,7,array),k=2,parent=0,父節點為索引0的位置也就是1,因為7>1滿足if語句,所以break跳出迴圈,執行array[2]=7,下圖所示:

 

  第四次呼叫offer(2),也就是呼叫siftUpComparable(3,2,array),k=3,parent=(k-1)>>>1=1,所以父節點表示索引為1的位置,也就是3,因為2<3,不滿足if語句,所以設定array[3]=3,k=1,再進行一次迴圈,parent=0,此時父節點的值是1,2<3,不滿足if,設定array[1]=1,k=0;再繼續迴圈不滿足迴圈條件,跳出迴圈,設定array[0] = 2

 

  還是很容易的,有興趣的話再多試試新增幾個節點啊!其實還有[3]中使用我們自定義的比較器進行比較,其實i和上面程式碼一樣的,另外put方法就是呼叫的offer方法,這裡就不多說了

 

三.poll方法

   poll方法的作用是獲取並刪除佇列內部二叉樹的根節點,如果佇列為空,就返回nul;

public E poll() {
    final ReentrantLock lock = this.lock;
    //獲取獨佔鎖,說明此時不能有其他執行緒進行入隊和出隊操作,但是可以進行擴容
    lock.lock();
    try {
        //獲取並刪除根節點,方法如下
        return dequeue();
    } finally {
        //釋放獨佔鎖
        lock.unlock();
    }
}

//這個方法可以好好看看,很有意思
private E dequeue() {
    int n = size - 1;
    //如果佇列為空,就返回null
    if (n < 0)
        return null;
    else {
        //否則就先取到陣列
        Object[] array = queue;
        //取到第0個元素,這個也就是要返回的根節點
        E result = (E) array[0];
        //獲取佇列實際數量的最後一個元素,並把該位置賦值為null
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            //預設的比較器,這裡是真正的移除根節點,然後調整在整個平衡二叉樹,使得達到平衡
            siftDownComparable(0, x, array, n);
        else
            //我們傳入的自定義比較器
            siftDownUsingComparator(0, x, array, n, cmp);
        //然後數量減一
        size = n;
        //返回根節點
        return result;
    }
}

private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        //[1]
        int half = n >>> 1;           // loop while a non-leaf
        //[2]
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            //[3]
            if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            //[4]
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

 

 

  所以我們主要的是看看siftDownComparable方法中是怎麼將一個去掉了根節點的平衡二叉樹調整平衡的;比如現在有如下所示的平衡二叉樹:

 

 

  呼叫poll方法,先是把最後一個元素儲存起來x=3,然後將最後一個位置設定為null,此時實際呼叫的是siftDownComparable(0,3,array,3),key=3,half=1,k=0,n=3,滿足[2],於是child=1,c=1,right=2,不滿足[3],不滿足[4],設定array[0]=1,k=1;繼續迴圈,不滿足迴圈條件,跳出迴圈,直接設定array[1]=3,最後poll方法返回的時2,下圖所示:

 

 

   其實可以簡單的說說,最開始將陣列中最後一個值X儲存起來在適當時機插入到二叉樹中,什麼時候是適當時機呢?首先去掉根節點之後,得到根節點左子節點和右子節點的值leftVal和rightVal,如果X比leftVal小,那就直接把X放入到根節點的位置,整個平衡二叉樹就平衡了!如果X比leftVal大,那就將leftVal的值設定到根節點中,再以左子節點做遞迴,繼續比較X和左子節點的左節點的大小!仔細看看也沒啥。

 

四.take方法

  這個方法作用是獲取二叉樹中的根節點,也就是陣列的第一個節點,佇列為空,就阻塞;

public E take() throws InterruptedException {
    //獲取鎖,可中斷
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        //如果二叉樹為空了,那麼dequeue方法就會返回null,然後這裡就會阻塞
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        //釋放鎖
        lock.unlock();
    }
    return result;
}
//這個方法前面說過,就是刪除根節點,然後調整平衡二叉樹
private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

 

 

五.一個簡單的例子

  前面看了這個多方法,那就說說怎麼使用吧,看看PriorityBlockingQueue這個阻塞佇列怎麼使用;

package com.example.demo.study;

import java.util.Random;
import java.util.concurrent.PriorityBlockingQueue;

import lombok.Data;

public class Study0208 {
    
    @Data
    static class MyTask implements Comparable<MyTask>{
        private int priority=0;
        
        private String taskName;
        
        @Override
        public int compareTo(MyTask o) {
            if (this.priority>o.getPriority()) {
                return 1;
            }
            return -1;
        }    
    }
    
    public static void main(String[] args) {
        PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<MyTask>();
        Random random = new Random();
        //往佇列中放是個任務,從TaskName是按照順序放進去的,優先順序是隨機的
        for (int i = 1; i < 11; i++) {
            MyTask task = new MyTask();
            task.setPriority(random.nextInt(10));
            task.setTaskName("taskName"+i);
            queue.offer(task);
        }
        
        //從佇列中取出任務,這裡是按照優先順序去拿出來的,相當於是根據優先順序做了一個排序
        while(!queue.isEmpty()) {
            MyTask pollTask = queue.poll();
            System.out.println(pollTask.toString());
        }
        
    }

}

&n