執行緒池中常用的阻塞佇列簡述
阿新 • • 發佈:2018-12-26
一、ArrayBlockingQueue
基於陣列的阻塞佇列,有界佇列,按照先進先出(FIFO)的形式,初始化是必須指定capacity.看一下原始碼:
/**第一種構造方法,指定初始容量*/ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /**第二種構造方法,指定初始容量和一個標誌,true:公平鎖,false:非公平鎖*/ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /**第三種構造方法,指定初始容量和一個標誌,true:公平鎖,false:非公平鎖;指定一個初始化的集合*/ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
二、LinkedBlockingQueue
基於連結串列的阻塞佇列,元素按照先進先出(FIFO)的策略。原始碼如下:
/**無界佇列*/ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /**指定初始化容量*/ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } /**無界佇列,並指定初始化的集合*/ public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
三、SynchronousBlockingQueue
容量為0,不儲存任務,通俗地講就是有一個處理一個。原始碼如下:
/**第一種無參,this指定的是第二種構造方法*/ public SynchronousQueue() { this(false); } /**第二種,指定fair true:雙端佇列 false:雙端棧*/ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
四、PriorityBlockingQueue
這是一個無界有序的阻塞佇列,排序規則和之前介紹的PriorityQueue一致,只是增加了阻塞操作。同樣的該佇列不支援插入null元素,同時不支援插入非comparable的物件。它的迭代器並不保證佇列保持任何特定的順序,如果想要順序遍歷,考慮使用Arrays.sort(pq.toArray())。該類不保證同等優先順序的元素順序,如果你想要強制順序,就需要考慮自定義順序或者是Comparator使用第二個比較屬性。原始碼如下:
/**第一種,建立容量的11的佇列*/
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
/**第二種,指定初始化容量*/
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
/**第三種,指定初始化容量和比較規則*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
/**第四種,指定初始化集合,使用鎖*/
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify();
}
它使用如下方法進行擴容:
/***/
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
其先放開了鎖,然後通過CAS設定allocationSpinLock來判斷哪個執行緒獲得了擴容許可權,如果沒搶到許可權就會讓出CPU使用權。最後還是要鎖住開始真正的擴容。擴容許可權爭取到了就是計算大小,分配陣列。擴容的大小時原有的一倍。