一、DelayQueue併發容器 1.Delay Queue的底層實現     Delay Queue是一個執行緒安全且無界的阻塞佇列,只有在延遲時間滿足後才能獲取佇列中的元素,因此佇列中的元素必須實現Delay介面,在建立元素時指定多久時間後才能從佇列中獲取該元素。Delay Queue的底層實現是使用了PriorityQueue+ReentrantLock來實現延遲獲取功能。   2.PriorityQueue分析     其中PriorityQueue是種優先順序佇列,執行緒不安全,佇列中的元素會按照優先順序來排序。該佇列底層實現是使用二叉堆,並且元素按照其自然順序進行排序,或者根據構造佇列時提供的Comparator進行排序。因為PriorityQueue中的元素都要進行比較,所以優先順序佇列中不能擁有null元素,也不能有不能比較的元素。     PriorityQueue的繼承關係如下圖:
public class PriorityQueue<E> extends AbstractQueue<E>
    implements java.io.Serializable {
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    transient Object[] queue; // non-private to simplify nested class access

    private int size = 0;

    private final Comparator<? super E> comparator;

    transient int modCount = 0; 

    public PriorityQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);

    public PriorityQueue(int initialCapacity) {
        this(initialCapacity, null);

    public PriorityQueue(Comparator<? super E> comparator) {
        this(DEFAULT_INITIAL_CAPACITY, comparator);

    public PriorityQueue(int initialCapacity,
                         Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.queue = new Object[initialCapacity];    //初始化底層陣列
        this.comparator = comparator;    //比較器初始化

    public PriorityQueue(Collection<? extends E> c) {

        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();

        else if (c instanceof PriorityQueue<?>) {
            PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
        else {
            this.comparator = null;

    private void initElementsFromCollection(Collection<? extends E> c) {
        Object[] a = c.toArray();
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, a.length, Object[].class);
        int len = a.length;
        if (len == 1 || this.comparator != null)
            for (int i = 0; i < len; i++)
                if (a[i] == null)
                    throw new NullPointerException();
        this.queue = a;
        this.size = a.length;

    private void initFromPriorityQueue(PriorityQueue<? extends E> c) {
        if (c.getClass() == PriorityQueue.class) {
            this.queue = c.toArray();
            this.size = c.size();
        } else {

    private void initFromCollection(Collection<? extends E> c) {

    public PriorityQueue(PriorityQueue<? extends E> c) {
        this.comparator = (Comparator<? super E>) c.comparator();

    public PriorityQueue(SortedSet<? extends E> c) {
        this.comparator = (Comparator<? super E>) c.comparator();


public boolean add(E e) {
    return offer(e);

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    modCount++;    //快速失敗機制
    int i = size;    //獲取當前佇列中元素個數
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;    //元素計數+1
    if (i == 0)
        queue[0] = e;
        siftUp(i, e);    //插入陣列
    return true;

private void grow(int minCapacity) {
    int oldCapacity = queue.length;    //佇列舊容量
    int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                     (oldCapacity + 2) :
                                     (oldCapacity >> 1));
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    queue = Arrays.copyOf(queue, newCapacity);

* 上浮過程
* 假設已有一個有序堆(升序)如下所示:
*          10
*      /       \
*    20         40
*   /  \      /
*  60   70   90
* 現在要將元素30插入堆中,則有
* 1.將要插入的30先放在二叉堆的末尾
* 2.再將其與父結點進行比較,判斷是否要上浮(小於父結點就上浮)
* 3.若小於父結點則交換位置,再重複第2步驟繼續上浮
* 4.若大於則直接結束上浮
*          10                         10
*      /       \                  /       \
*    20         40      ——>     20         30
*   /  \      /   \            /  \      /   \
*  60   70   90    30        60   70   90    40
private void siftUp(int k, E x) {
    if (comparator != null)
        siftUpUsingComparator(k, x);    //比較器排序
        siftUpComparable(k, x);    //自然排序

private void siftUpUsingComparator(int k, E x) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        if (comparator.compare(x, (E) e) >= 0)
        queue[k] = e;
        k = parent;
    queue[k] = x;

private void siftUpComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        if (key.compareTo((E) e) >= 0)
        queue[k] = e;
        k = parent;
    queue[k] = key;


public E poll() {
    if (size == 0)    //判斷佇列是否是空佇列
        return null;
    int s = --size;
    E result = (E) queue[0];    //取出隊首元素
    E x = (E) queue[s];    //獲取隊尾元素
    queue[s] = null;    //隊尾賦null

    if (s != 0)
        siftDown(0, x);    //下沉方法
    return result;

* 下沉過程
* 假設已有一個有序堆(升序)如下所示:
*          10
*      /       \
*    20         30
*   /  \      /    \
*  60   70   90     40
* 現在要將元素10出隊,則有
* 1.將要出隊的10移除出二叉堆,並將隊尾40放到堆頂
* 2.將堆頂元素與兩個子結點中較小的元素相比較,選擇小的元素作為新的堆頂元素
* 3.重複對堆中前一半結點進行將第2步的比較交換
*          40                         20
*      /       \                  /       \
*    20         30      ——>     40         30
*   /  \      /               /  \       /   
*  60   70   90             60   70    90 
private void siftDown(int k, E x) {
    if (comparator != null)
        siftDownUsingComparator(k, x);    //比較器下沉
        siftDownComparable(k, x);    //自然排序下沉

private void siftDownComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>)x;
    int half = size >>> 1;       //下沉要對堆中前一半的結點都進行
    while (k < half) {
        int child = (k << 1) + 1;     
        Object c = queue[child];    //獲取當前結點的左孩子
        int right = child + 1;    //右孩子索引
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            c = queue[child = right];

        if (key.compareTo((E) c) <= 0)
        queue[k] = c;
        k = child;
    queue[k] = key;

private void siftDownUsingComparator(int k, E x) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        Object c = queue[child];
        int right = child + 1;
        if (right < size && comparator.compare((E) c, (E) queue[right]) > 0)
            c = queue[child = right];
        if (comparator.compare(x, (E) c) <= 0)
        queue[k] = c;
        k = child;
    queue[k] = x;






public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);


* 延遲佇列的使用示例
* 主執行緒建立三個延遲任務放到queue中,其他三個執行緒
* 在任務可用時取出
* Created by bzhang on 2019/4/1.
public class TestDelayed implements Delayed {
      private String name;
      private Date takeTime;  //延遲時間

      public TestDelayed(String name, Date takeTime) {
            this.name = name;
            this.takeTime = takeTime;

      public String getName() {
            return name;

      public void setName(String name) {
            this.name = name;

      public Date getTakeTime() {
            return takeTime;

      public void setTakeTime(Date takeTime) {
            this.takeTime = takeTime;

      public long getDelay(TimeUnit unit) {
            long convert = unit.convert(takeTime.getTime()-System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            return convert;

      public int compareTo(Delayed o) {
            TestDelayed t = (TestDelayed)o;
            long l = this.takeTime.getTime() - t.getTakeTime().getTime();
            if (l==0){
                  return 0;
            return l > 0 ? 1 : -1;

      public String toString() {
            return "TestDelayed{" +
                    "name='" + name + '\'' +
                    ", takeTime=" + takeTime +

      public static void main(String[] args) {
            DelayQueue queue = new DelayQueue();
            long l = System.currentTimeMillis();
            queue.put(new TestDelayed("A",new Date(l+5000)));
            queue.put(new TestDelayed("B",new Date(l+2000)));
            queue.put(new TestDelayed("C",new Date(l+7000)));

            System.out.println(new Date());
            int t = 0;
            for (int i = 0;i < 3;i++){
                  new Thread(new Runnable() {
                        public void run() {
                              try {
                              } catch (InterruptedException e) {


Tue Apr 02 11:03:33 CST 2019
Thread-1TestDelayed{name='B', takeTime=Tue Apr 02 11:03:35 CST 2019}
Thread-0TestDelayed{name='A', takeTime=Tue Apr 02 11:03:38 CST 2019}
Thread-2TestDelayed{name='C', takeTime=Tue Apr 02 11:03:40 CST 2019}


public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    private final transient ReentrantLock lock = new ReentrantLock();

    private final PriorityQueue<E> q = new PriorityQueue<E>();

    private Thread leader = null;

    private final Condition available = lock.newCondition();

    public DelayQueue() {}

    public DelayQueue(Collection<? extends E> c) {



public boolean add(E e) {
    return offer(e);

public void put(E e) {

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    try {
        q.offer(e);    //呼叫底層優先順序佇列的offer方法來儲存元素

        if (q.peek() == e) {
            leader = null;

        return true;
    } finally {


public E poll() {
    final ReentrantLock lock = this.lock;    //重入鎖
    lock.lock();    //加鎖同步
    try {
        E first = q.peek();    //獲取優先順序佇列中的隊首元素

        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
            return q.poll();
    } finally {

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();    //可被中斷鎖
    try {
        for (;;) {    //自旋
            E first = q.peek();    //獲取隊首元素

            if (first == null)
            else {
                long delay = first.getDelay(NANOSECONDS);    //獲取剩餘延遲時間(單位是ns)
                if (delay <= 0)    //沒有剩餘延遲時間,則將隊首元素返回
                    return q.poll();
                first = null; 
                if (leader != null)
                else {
                    Thread thisThread = Thread.currentThread();    //獲取當前執行緒
                    leader = thisThread;    //將單籤執行緒設為等待獲取隊首的執行緒
                    try {
                    } finally {

                        if (leader == thisThread)
                            leader = null;
    } finally {
        if (leader == null && q.peek() != null)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    try {
        for (;;) {
            E first = q.peek();

            if (first == null) {
                if (nanos <= 0)
                    return null;
                    nanos = available.awaitNanos(nanos);    //當前執行緒進入等待時間nanos納秒
            } else {
                long delay = first.getDelay(NANOSECONDS);    //獲取隊首元素的延遲時間

                if (delay <= 0)
                    return q.poll();

                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting

                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);

                else {
                    //等待時間 > 延遲時間 並且沒有其它執行緒在等待,
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
    } finally {
        if (leader == null && q.peek() != null)


public E peek() {
    final ReentrantLock lock = this.lock;
    try {
        return q.peek();
    } finally {








public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    //佇列雖然說是無界的,但實際佇列是不能超過Integer.MAX_VALUE - 8這個值的
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    private transient Object[] queue;

    private transient int size;

    private transient Comparator<? super E> comparator;

    private final ReentrantLock lock;

    private final Condition notEmpty;

    private transient volatile int allocationSpinLock;

    private PriorityQueue<E> q;

    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];

    public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls

        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        this.queue = a;
        this.size = n;
        if (heapify)


public boolean add(E e) {
    return offer(e);

public void put(E e) {
    offer(e); // never need to block

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e); // never need to block

public boolean offer(E e) {

    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    int n, cap;    //n為當前佇列中的元素個數,cap為當前佇列的容量
    Object[] array;

    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;

        if (cmp == null)
            siftUpComparable(n, e, array);    //自然排序上浮
            siftUpUsingComparator(n, e, array, cmp);    //比較器上浮
        size = n + 1;
    } finally {
    return true;

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;    //獲取父結點索引
        Object e = array[parent];    //父結點

        if (key.compareTo((T) e) >= 0)    
        array[k] = e;
        k = parent;
    array[k] = key;

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                   Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
        array[k] = e;
        k = parent;
    array[k] = x;

private void tryGrow(Object[] array, int oldCap) {
    // 擴容時不需要加鎖,因為擴容是通過CAS方式來實現的,
    Object[] newArray = null;

    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];    //建立新陣列
        } finally {
            allocationSpinLock = 0;    //恢復為0,表示沒有在擴容狀態
    if (newArray == null)     //未競爭到擴容操作的執行緒暫停
    lock.lock();    /重新上鎖
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);    


public E poll() {
    final ReentrantLock lock = this.lock;
    try {
        return dequeue();    //真正出隊的方法
    } finally {

private E dequeue() {
    int n = size - 1;      //移除隊首後佇列中的元素個數 ,同時也是隊尾元素的索引 

    if (n < 0)
        return null;
    else {
        Object[] array = queue;    //獲取底層陣列引用
        E result = (E) array[0];    //獲取隊首元素
        E x = (E) array[n];    //獲取隊尾元素
        array[n] = null;    //隊尾置為null
        Comparator<? super E> cmp = comparator;

        if (cmp == null)
            siftDownComparable(0, x, array, n);    //使用自然排序下沉
            siftDownUsingComparator(0, x, array, n, cmp);    //使用比較器下沉
        size = n;
        return result;

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
            array[k] = c;
            k = child;
        array[k] = key;

private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                int n,
                                                Comparator<? super T> cmp) {
    if (n > 0) {
        int half = n >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            if (cmp.compare(x, (T) c) <= 0)
            array[k] = c;
            k = child;
        array[k] = x;

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    E result;
    try {
        while ( (result = dequeue()) == null)
    } finally {
    return result;

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    E result;
    try {

        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
    return result;

public E peek() {
    final ReentrantLock lock = this.lock;
    try {
        return (size == 0) ? null : (E) queue[0];
    } finally {
6.總結     1.PriorityBlocking Queue是基於陣列實現的二叉堆結構。     2.PriorityBlocking Queue中涉及到元素之間的比較,因此不能存在null元素。     3.PriorityBlocking Queue的入隊出隊操作執行緒安全是通過重入鎖ReentrantLock實現的,但在擴容時是基於CAS演算法實現的。     4.PriorityBlocking Queue是無界佇列,其入隊出隊規則是基於優先順序的,雖然說是無界佇列,但並不是無限大的,容量不能超過 Integer.MAX_VALUE - 8。