1. 程式人生 > >併發容器學習—DelayQueue與PriorityBlockingQueue

併發容器學習—DelayQueue與PriorityBlockingQueue

一、DelayQueue併發容器 1.Delay Queue的底層實現     Delay Queue是一個執行緒安全且無界的阻塞佇列,只有在延遲時間滿足後才能獲取佇列中的元素,因此佇列中的元素必須實現Delay介面,在建立元素時指定多久時間後才能從佇列中獲取該元素。Delay Queue的底層實現是使用了PriorityQueue+ReentrantLock來實現延遲獲取功能。   2.PriorityQueue分析     其中PriorityQueue是種優先順序佇列,執行緒不安全,佇列中的元素會按照優先順序來排序。該佇列底層實現是使用二叉堆,並且元素按照其自然順序進行排序,或者根據構造佇列時提供的Comparator進行排序。因為PriorityQueue中的元素都要進行比較,所以優先順序佇列中不能擁有null元素,也不能有不能比較的元素。     PriorityQueue的繼承關係如下圖:
    PriorityQueue中的屬性及構造方法:
public class PriorityQueue<E> extends AbstractQueue<E>
    implements java.io.Serializable {
    
    //佇列的預設容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    //底層用於存放資料的陣列
    transient Object[] queue; // non-private to simplify nested class access

    //佇列中的元素數量計數
    private int size = 0;

    //比較器
    private final Comparator<? super E> comparator;

    //快速失敗機制使用的變數
    transient int modCount = 0; 

    //建立一個預設容量的佇列
    public PriorityQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    //建立一個指定容量的佇列
    public PriorityQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    //建立一個指定比較器的預設容量佇列
    public PriorityQueue(Comparator<? super E> comparator) {
        this(DEFAULT_INITIAL_CAPACITY, comparator);
    }

    //建立一個指定比較器且指定容量佇列
    public PriorityQueue(int initialCapacity,
                         Comparator<? super E> comparator) {
        //判斷指定的容量值是否合法
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.queue = new Object[initialCapacity];    //初始化底層陣列
        this.comparator = comparator;    //比較器初始化
    }

    //建立一個帶有指定集合中的元素的佇列
    @SuppressWarnings("unchecked")
    public PriorityQueue(Collection<? extends E> c) {

        //判斷c是否是有序集合
        //若是有序集合,那麼就以其比較器作為佇列的比較器
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            initElementsFromCollection(ss);
        }

        //判斷集合是否是優先順序佇列
        //若是的話,直接使用該佇列的比較器,
        else if (c instanceof PriorityQueue<?>) {
            PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            initFromPriorityQueue(pq);
        }
        else {
            this.comparator = null;
            initFromCollection(c);
        }
    }

    //將容器c中的元素新增到優先順序佇列中
    private void initElementsFromCollection(Collection<? extends E> c) {
        Object[] a = c.toArray();
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, a.length, Object[].class);
        int len = a.length;
        if (len == 1 || this.comparator != null)
            for (int i = 0; i < len; i++)
                if (a[i] == null)
                    throw new NullPointerException();
        this.queue = a;
        this.size = a.length;
    }

    //將優先順序佇列c中的元素新增到當前優先順序佇列中
    private void initFromPriorityQueue(PriorityQueue<? extends E> c) {
        if (c.getClass() == PriorityQueue.class) {
            this.queue = c.toArray();
            this.size = c.size();
        } else {
            initFromCollection(c);
        }
    }

    //將容器c中的元素新增到優先順序佇列中
    private void initFromCollection(Collection<? extends E> c) {
        initElementsFromCollection(c);
        heapify();
    }

    //建立包含優先順序佇列c中元素的佇列,且使用同一個比較器
    @SuppressWarnings("unchecked")
    public PriorityQueue(PriorityQueue<? extends E> c) {
        this.comparator = (Comparator<? super E>) c.comparator();
        initFromPriorityQueue(c);
    }

    //建立包含排序集合c中元素的優先順序佇列,且使用同一個比較器
    @SuppressWarnings("unchecked")
    public PriorityQueue(SortedSet<? extends E> c) {
        this.comparator = (Comparator<? super E>) c.comparator();
        initElementsFromCollection(c);
    }
}

    PriorityQueue中的入隊方法分析:

//add與offer沒有區別
public boolean add(E e) {
    return offer(e);
}


public boolean offer(E e) {
    //佇列中不允許有null元素
    if (e == null)
        throw new NullPointerException();
    modCount++;    //快速失敗機制
    int i = size;    //獲取當前佇列中元素個數
    //判斷陣列是否需要擴容
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;    //元素計數+1
    
    //新增元素的插入位置
    //若佇列原本為空,則直接放到0位置
    //若佇列原本不為空
    if (i == 0)
        queue[0] = e;
    else
        siftUp(i, e);    //插入陣列
    return true;
}

//擴容
private void grow(int minCapacity) {
    int oldCapacity = queue.length;    //佇列舊容量
    
    //擴容機制,佇列原容量小於64時,擴容為原來的2倍再加2
    //大於64,則擴大1.5倍
    int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                     (oldCapacity + 2) :
                                     (oldCapacity >> 1));
    
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    queue = Arrays.copyOf(queue, newCapacity);
}

//上浮
/**
* 上浮過程
* 假設已有一個有序堆(升序)如下所示:
*          10
*      /       \
*    20         40
*   /  \      /
*  60   70   90
* 現在要將元素30插入堆中,則有
* 1.將要插入的30先放在二叉堆的末尾
* 2.再將其與父結點進行比較,判斷是否要上浮(小於父結點就上浮)
* 3.若小於父結點則交換位置,再重複第2步驟繼續上浮
* 4.若大於則直接結束上浮
*          10                         10
*      /       \                  /       \
*    20         40      ——>     20         30
*   /  \      /   \            /  \      /   \
*  60   70   90    30        60   70   90    40
*/
private void siftUp(int k, E x) {
    //判斷佇列是自然排序還是比較器排序
    if (comparator != null)
        siftUpUsingComparator(k, x);    //比較器排序
    else
        siftUpComparable(k, x);    //自然排序
}

//入隊操作本質是一個堆排序中的一個上浮的過程
private void siftUpUsingComparator(int k, E x) {
    //判斷索引位置是否大於0,即是否到達堆頂
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        if (comparator.compare(x, (E) e) >= 0)
            break;
        queue[k] = e;
        k = parent;
    }
    queue[k] = x;
}

//另一個上浮方法,使用的自然排序
private void siftUpComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        if (key.compareTo((E) e) >= 0)
            break;
        queue[k] = e;
        k = parent;
    }
    queue[k] = key;
}

    PriorityQueue中的出隊方法分析:

public E poll() {
    if (size == 0)    //判斷佇列是否是空佇列
        return null;
    int s = --size;
    modCount++;
    E result = (E) queue[0];    //取出隊首元素
    E x = (E) queue[s];    //獲取隊尾元素
    queue[s] = null;    //隊尾賦null

    //將原本的隊尾元素放到堆頂,再對整個堆進行排序整理
    //即下沉
    if (s != 0)
        siftDown(0, x);    //下沉方法
    return result;
}

//下沉
/**
* 下沉過程
* 假設已有一個有序堆(升序)如下所示:
*          10
*      /       \
*    20         30
*   /  \      /    \
*  60   70   90     40
* 現在要將元素10出隊,則有
* 1.將要出隊的10移除出二叉堆,並將隊尾40放到堆頂
* 2.將堆頂元素與兩個子結點中較小的元素相比較,選擇小的元素作為新的堆頂元素
* 3.重複對堆中前一半結點進行將第2步的比較交換
*          40                         20
*      /       \                  /       \
*    20         30      ——>     40         30
*   /  \      /               /  \       /   
*  60   70   90             60   70    90 
*/
private void siftDown(int k, E x) {
    if (comparator != null)
        siftDownUsingComparator(k, x);    //比較器下沉
    else
        siftDownComparable(k, x);    //自然排序下沉
}

//使用自然排序下沉
private void siftDownComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>)x;
    int half = size >>> 1;       //下沉要對堆中前一半的結點都進行
    while (k < half) {
        int child = (k << 1) + 1;     
        Object c = queue[child];    //獲取當前結點的左孩子
        int right = child + 1;    //右孩子索引
    
        //若存在右孩子,那麼左右孩子先比較大小,取小再與父結點比較
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            c = queue[child = right];

        //父結點與子結點比較
        //若父結點小於子結點,則直接結束下沉的過程
        //否則,互動位置後繼續下沉操作
        if (key.compareTo((E) c) <= 0)
            break;
        queue[k] = c;
        k = child;
    }
    queue[k] = key;
}

//使用比較器下沉
@SuppressWarnings("unchecked")
private void siftDownUsingComparator(int k, E x) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        Object c = queue[child];
        int right = child + 1;
        if (right < size && comparator.compare((E) c, (E) queue[right]) > 0)
            c = queue[child = right];
        if (comparator.compare(x, (E) c) <= 0)
            break;
        queue[k] = c;
        k = child;
    }
    queue[k] = x;
}

3.DelayQueue的繼續體系

    瞭解了DelayQueue的底層實際是通過PriorityQueue實現,再來看看DelayQueue的繼承關係,如下圖所示,父類及介面之前的學習中都已分析過,不在贅言。

 

4.Delay介面

    DelayQueue佇列與其他佇列最明顯的不同之處,就是它的延時功能,也正因為這個延時特點,DelayQueue中的物件都必須要實現Delay介面,接下來就看看這個Delay介面是幹什麼的。

//用來標記那些應該在給定延遲時間之後執行的物件
public interface Delayed extends Comparable<Delayed> {
    //檢查延遲是否結束,該方法返回一個延遲時間,時間到後在檢查還有沒有
    //延遲,若沒有延遲執行下一步,若還有延遲,繼續等待
    long getDelay(TimeUnit unit);
}

    DelayQueue的使用示例:

/**
* 延遲佇列的使用示例
* 主執行緒建立三個延遲任務放到queue中,其他三個執行緒
* 在任務可用時取出
* Created by bzhang on 2019/4/1.
*/
public class TestDelayed implements Delayed {
      private String name;
      private Date takeTime;  //延遲時間

      public TestDelayed(String name, Date takeTime) {
            this.name = name;
            this.takeTime = takeTime;
      }

      public String getName() {
            return name;
      }

      public void setName(String name) {
            this.name = name;
      }

      public Date getTakeTime() {
            return takeTime;
      }

      public void setTakeTime(Date takeTime) {
            this.takeTime = takeTime;
      }

      @Override
      public long getDelay(TimeUnit unit) {
            long convert = unit.convert(takeTime.getTime()-System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            return convert;
      }

      @Override
      public int compareTo(Delayed o) {
            TestDelayed t = (TestDelayed)o;
            long l = this.takeTime.getTime() - t.getTakeTime().getTime();
            if (l==0){
                  return 0;
            }
            return l > 0 ? 1 : -1;
      }

      @Override
      public String toString() {
            return "TestDelayed{" +
                    "name='" + name + '\'' +
                    ", takeTime=" + takeTime +
                    '}';
      }

      public static void main(String[] args) {
            DelayQueue queue = new DelayQueue();
            long l = System.currentTimeMillis();
            queue.put(new TestDelayed("A",new Date(l+5000)));
            queue.put(new TestDelayed("B",new Date(l+2000)));
            queue.put(new TestDelayed("C",new Date(l+7000)));

            System.out.println(new Date());
            int t = 0;
            for (int i = 0;i < 3;i++){
                  new Thread(new Runnable() {
                        @Override
                        public void run() {
                              try {
                                    System.out.println(Thread.currentThread().getName()+queue.take());
                              } catch (InterruptedException e) {
                                    e.printStackTrace();
                              }
                        }
                  }).start();
            }


      }
}

//結果
Tue Apr 02 11:03:33 CST 2019
Thread-1TestDelayed{name='B', takeTime=Tue Apr 02 11:03:35 CST 2019}
Thread-0TestDelayed{name='A', takeTime=Tue Apr 02 11:03:38 CST 2019}
Thread-2TestDelayed{name='C', takeTime=Tue Apr 02 11:03:40 CST 2019}

5.DelayQueue中的重要屬性及構造方法

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    //重入鎖,用於保證併發安全
    private final transient ReentrantLock lock = new ReentrantLock();

    //底層優先順序佇列,實際元素都儲存與該佇列中,底層是陣列構成的二叉堆
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    //下一個等待獲取元素的執行緒,可減少不必要的等待
    private Thread leader = null;

    //條件控制,表示是否可以從佇列中取資料
    private final Condition available = lock.newCondition();


    public DelayQueue() {}

    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

}

6.DelayQueue的入隊方法

//add方法本質就是呼叫offer方法,將元素新增到佇列
public boolean add(E e) {
    return offer(e);
}

//同上
public void put(E e) {
    offer(e);
}

//延遲佇列是無界佇列,指定超時時間放入元素沒有意義,與直接入隊是一樣的
public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

//向佇列中新增元素,元素位置以比較結果(compareTo方法)來確定
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);    //呼叫底層優先順序佇列的offer方法來儲存元素

        //判斷底層優先順序佇列的隊首是否是新增元素
        if (q.peek() == e) {
            leader = null;

            //喚醒條件等待佇列的某一個執行緒,即說明佇列中有元素了,
            //可以從佇列中獲取到元素了
            available.signal();    
        }
        return true;
    } finally {
        lock.unlock();
    }
}

7.DelayQueue的出隊方法

//返回延遲時間已到的第一個元素,或返回null(沒有元素或元素延遲時間都未到)
public E poll() {
    final ReentrantLock lock = this.lock;    //重入鎖
    lock.lock();    //加鎖同步
    try {
        E first = q.peek();    //獲取優先順序佇列中的隊首元素

        //判斷佇列是否為空,若不為空那麼隊首延遲時間是否到達,若都不滿足
        //說明隊首元素可用,返回隊首
        //否則返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

//若有延遲時間已到的元素就立即返回,若無則一直等待
//佇列中無元素那麼也一直等待
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();    //可被中斷鎖
    try {
        for (;;) {    //自旋
            E first = q.peek();    //獲取隊首元素

            //若佇列為空,直接進入條件佇列等待喚醒
            //佇列不為空,則判斷隊首的延時是否到達
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);    //獲取剩餘延遲時間(單位是ns)
                if (delay <= 0)    //沒有剩餘延遲時間,則將隊首元素返回
                    return q.poll();
                first = null; 
    
                //判斷是否已經有其他執行緒在等待取元素
                //若有,那麼就讓當前執行緒直接等待
                //若沒有,那就說明當前只有本執行緒在等待獲取隊首元素
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();    //獲取當前執行緒
                    leader = thisThread;    //將單籤執行緒設為等待獲取隊首的執行緒
                    try {
                        //等待隊首元素的延遲時間後,在嘗試獲取隊首元素
                        available.awaitNanos(delay);    
                    } finally {

                        //將等待獲取的執行緒設為null,因為當前執行緒正在獲取,因此不應該有leader
                        //即leader為null,說明要麼有執行緒正在執行獲取操作,要麼沒有出隊操作在進行
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //當前執行緒已經取完元素了,可以喚醒其他執行緒獲取隊首元素了
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

//指定時間內獲取延遲的隊首元素,若在指定等待時間內隊首延遲時間未到達或佇列為空
//就返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();

            //佇列是否為空,若為空佇列,那麼在指定等待是否到達,若等待時間也已到達
            //那就返回null,若未到達等待時間,就繼續等待
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);    //當前執行緒進入等待時間nanos納秒
            } else {
                long delay = first.getDelay(NANOSECONDS);    //獲取隊首元素的延遲時間

                //判斷延遲時間是否到達,到達就直接將隊首元素返回
                if (delay <= 0)
                    return q.poll();

                //延遲時間未到,但等待時間已經達到,那麼就返回null
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting

                //延遲時間小於等待時間,說明可以在等待時間內獲取到隊首元素
                //那麼就在等待延遲時間到達的時間內,可以再次嘗試將隊首元素獲取返回
                //這裡僅是再次嘗試,因為可能在等待期間內有新的元素入隊,且延遲時間最小成為新隊首
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);

                else {
                    //等待時間 > 延遲時間 並且沒有其它執行緒在等待,
                    //那麼當前元素成為leader,表示當前執行緒最早正在等待獲取元素
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //讓等待時間到達
                        long timeLeft = available.awaitNanos(delay);
                        //繼續等待的時間
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

8.peek方法

//peek方法僅僅就是為底層的優先順序佇列的peek方法加上鎖
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.peek();
    } finally {
        lock.unlock();
    }
}

二、PriorityBlockingQueue併發容器

1.PriorityBlockingQueue的底層實現

    PriorityBlockingQueue是一個執行緒安全的無界阻塞佇列,可以看對是PriorityQueue的多執行緒版本,其底層資料結構與PriorityQueue相同,都是陣列實現的利用二叉堆結構。前文已經分析過,這裡不再多說

 

2.PriorityBlockingQueue的繼承體系

    PriorityBlockingQueue的繼承關係如下圖所示,均是之前學習過的父類或介面。這裡不再展開。

3.PriorityBlockingQueue中的重要屬性及構造方法

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    //未指定佇列初始容量時使用的預設容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    //佇列雖然說是無界的,但實際佇列是不能超過Integer.MAX_VALUE - 8這個值的
    //若是超過報OOM異常
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    //底層存放資料的陣列
    private transient Object[] queue;

    //佇列中元素的個數,計數器
    private transient int size;

    //用於判斷優先順序的比較器,若為null則使用自然排序
    private transient Comparator<? super E> comparator;

    //重入鎖,保證併發安全
    private final ReentrantLock lock;

    //佇列非空條件,用於出隊操作
    private final Condition notEmpty;

    //用於佇列顯示是否處於擴容狀態,0表示沒有在擴容
    //而1表示處於擴容狀態,將該值更新成1的執行緒會進行陣列擴容
    //其他要進行擴容的執行緒檢查該值發現為1,則直接暫停執行緒讓出CPU
    private transient volatile int allocationSpinLock;

    //將佇列轉換成執行緒不安全的優先順序佇列,用於序列化
    private PriorityQueue<E> q;

    //建立一個預設初始容量的佇列
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    //建立一個指定初始容量的佇列
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    //建立一個指定容量和比較器的佇列
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

    //以集合c為底,建立一個佇列
    public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls

        //根據集合c是哪一種容器來決定建立怎樣的初始佇列
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();
    }
}

4.入隊方法

//PriorityBlockingQueue所有的入隊方法,都一樣,因為佇列是無界佇列
//不存在加入佇列失敗的可能,因此最終都是呼叫offer方法
public boolean add(E e) {
    return offer(e);
}

public void put(E e) {
    offer(e); // never need to block
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e); // never need to block
}

public boolean offer(E e) {

    //優先順序佇列中不允許存在null元素,因此null元素無法確定優先順序
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;    //n為當前佇列中的元素個數,cap為當前佇列的容量
    Object[] array;

    //判斷底層陣列是否需要擴容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;

        //判斷是使用比較器進行上浮操作,還是使用自然排序進行上浮操作
        if (cmp == null)
            siftUpComparable(n, e, array);    //自然排序上浮
        else
            siftUpUsingComparator(n, e, array, cmp);    //比較器上浮
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

//自然上浮,與PriorityQueue中一樣
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;    //獲取父結點索引
        Object e = array[parent];    //父結點

        //比較插入的值與父結點的大小,若比父結點小,那麼交換位置後,在繼續比較
        //若比父結點大,說明排序正確,無需在繼續比較
        if (key.compareTo((T) e) >= 0)    
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

//比較器比較上浮
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                   Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}

//陣列擴容
private void tryGrow(Object[] array, int oldCap) {
    // 擴容時不需要加鎖,因為擴容是通過CAS方式來實現的,
    //這樣不僅可以提升效率,並且不影響出隊操作
    lock.unlock();     
    Object[] newArray = null;

    //將allocationSpinLock更新成1的執行緒進行陣列擴容操作,其餘要擴容的執行緒暫停
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //擴容規則,容量小於64,擴大2倍+2,容量不小於64,則擴大1.5倍
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            //判斷擴大後的容量是否越界
            //若是會越界,則擴容規則改為舊容量+1,若仍越界,報OOM異常
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];    //建立新陣列
        } finally {
            allocationSpinLock = 0;    //恢復為0,表示沒有在擴容狀態
        }
    }
    if (newArray == null)     //未競爭到擴容操作的執行緒暫停
        Thread.yield();
    lock.lock();    /重新上鎖
    if (newArray != null && queue == array) {
        queue = newArray;
        //將舊陣列中的資料轉移到新陣列中
        System.arraycopy(array, 0, newArray, 0, oldCap);    
    }
}

5.出隊方法

//獲取並移除隊首元素,若佇列為空,返回null
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();    //真正出隊的方法
    } finally {
        lock.unlock();
    }
}

//真正執行獲取並移除隊首元素的方法
private E dequeue() {
    int n = size - 1;      //移除隊首後佇列中的元素個數 ,同時也是隊尾元素的索引 

    //判斷佇列是否為空佇列,空佇列直接返回null
    if (n < 0)
        return null;
    else {
        Object[] array = queue;    //獲取底層陣列引用
        E result = (E) array[0];    //獲取隊首元素
        E x = (E) array[n];    //獲取隊尾元素
        array[n] = null;    //隊尾置為null
        Comparator<? super E> cmp = comparator;

        //將原來佇列的隊尾放到隊首位置,然後進行下沉操作(即二叉堆重新排序的操作)
        if (cmp == null)
            siftDownComparable(0, x, array, n);    //使用自然排序下沉
        else
            siftDownUsingComparator(0, x, array, n, cmp);    //使用比較器下沉
        size = n;
        return result;
    }
}

//下沉操作與PriorityQueue中相同,這裡不在多做分析
//自然排序下沉
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

//比較器下沉
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                int n,
                                                Comparator<? super T> cmp) {
    if (n > 0) {
        int half = n >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            if (cmp.compare(x, (T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }
}


//獲取並移除隊首元素,若佇列已空,則等待
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        //若返回的元素為null,說明佇列中沒有元素
        //那麼讓當前執行緒進入條件佇列中等待,當前佇列有元素時,則
        //會喚醒執行緒,在嘗試獲取並移除隊首
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

//在一定時間內嘗試獲取並移除隊首元素,若在指定時間內未成功,
//返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {

        //嘗試獲取並移除隊首,若失敗但超時時間未到,則進入條件等待
        //一段時間後在進行嘗試,若超時時間已過仍為成功獲取並移除隊首
        //則返回null
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    return result;
}

//獲取但不移除隊首元素
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (size == 0) ? null : (E) queue[0];
    } finally {
        lock.unlock();
    }
}
6.總結     1.PriorityBlocking Queue是基於陣列實現的二叉堆結構。     2.PriorityBlocking Queue中涉及到元素之間的比較,因此不能存在null元素。     3.PriorityBlocking Queue的入隊出隊操作執行緒安全是通過重入鎖ReentrantLock實現的,但在擴容時是基於CAS演算法實現的。     4.PriorityBlocking Queue是無界佇列,其入隊出隊規則是基於優先順序的,雖然說是無界佇列,但並不是無限大的,容量不能超過 Integer.MAX_VALUE - 8。