jdk 常用的queue
阿新 • • 發佈:2019-01-10
queue佇列,先進先出
1、優先順序佇列,元素有優先順序
public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable
佇列使用堆排序,二叉樹,使用陣列儲存資料,非執行緒安全
入隊,新節點一直跟自己的父節點比較,不復合比較條件,交換位置,直到根節點
public boolean offer(E e) { if (e == null) throw new NullPointerException(); modCount++; int i = size; if (i >= queue.length) grow(i + 1); size = i + 1; if (i == 0) queue[0] = e; else siftUp(i, e); return true; }
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
出隊,取出根節點值,把最後一個節點放到根節點,然後比較根節點和左右兩個子節點,找到正確位置private void siftUpUsingComparator(int k, E x) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (comparator.compare(x, (E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = x; }
private void siftDownUsingComparator(int k, E x) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = queue[child]; int right = child + 1; if (right < size && comparator.compare((E) c, (E) queue[right]) > 0) c = queue[child = right]; if (comparator.compare(x, (E) c) <= 0) break; queue[k] = c; k = child; } queue[k] = x; }
對應執行緒安全的PriorityBlockingQueue,使用ReentrantLock,在入隊出隊總量等方法加鎖,出隊無值時候加入Condition佇列
2、延時佇列,元素有優先順序和延時時間,出隊時,時間未到,執行緒等待
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
返回並移出佇列頭,如果元素沒有到過期時間,則等待
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)// 已經有一個執行緒先來取值了,則當前執行緒無期等待
available.await();
else {
Thread thisThread = Thread.currentThread();//設定當前執行緒為最先來取值的執行緒,
leader = thisThread;
try {
available.awaitNanos(delay);//等待指定時間,到時間自動喚醒
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();//leader執行緒需要取完值後喚醒等待佇列上的第一個執行緒,使它去嘗試取新的佇列頭
lock.unlock();
}
}
3、阻塞佇列BlockingQueue,即入隊時空間不足,執行緒等待有空間時繼續入隊,出隊時佇列為空,執行緒等待直到有元素
常用的有ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue
ArrayBlockingQueue,初始化有界陣列,陣列儲存
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
兩個Condition,分別用來等待入隊的執行緒,和出隊的執行緒,入隊後喚醒notEmpty,出隊後喚醒notFull
LinkedBlockingQueue,有界,或者預設Integer.MAX_VALUE,使用連結串列儲存節點,
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
使用了兩個ReentrantLock分別控制入隊出隊,原理基本和ArrayBlockingQueue一致,Executors.newFixedThreadPool使用此佇列
SynchronousQueue,沒有容量或者"只有一個長度",入隊的執行緒直接阻塞,直到有出隊的執行緒出現,雙方交換資料,有點類似握手
有公平非公平兩種模式,預設非公平,