java併發程式設計——阻塞佇列與非阻塞佇列
ArrayBlockingQueue
ArrayBlockingQueue是一個有界阻塞佇列,資料結構基於陣列、使用ReentrantLock、Condition保證併發同步。
所謂阻塞佇列
當佇列滿了,則會對生產執行緒產生阻塞直到有空位可插入;
當佇列空了,則會對消費佇列產生阻塞直到有新的元素被加入佇列。
方法中含有字母t的都會產生阻塞waiting;
方法中含有o的都會返回 true/false;
剩下add、remove的會丟擲異常;
peek()會從佇列頭部觀察頭結點,但並不會對佇列造成影響。
我們通過一個簡單的應用,來逐步分析ArrayBlockingQueue佇列的程式碼:
public class ArrayBlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService ex = Executors.newFixedThreadPool(50);
ArrayBlockingQueue<CustomizedTask> tasksQueue = new ArrayBlockingQueue<CustomizedTask>(100);//有界佇列 100個元素
// 生產者執行緒
new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
tasksQueue.put(new CustomizedTask());
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
// 消費者執行緒
new Thread(new Runnable() {
@Override
public void run() {
CustomizedTask task;
try {
while ((task = tasksQueue.take()) != null && !Thread.currentThread().isInterrupted()) {
ex.submit(task);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
System.out.println("Main Thread is terminated");
}
static class CustomizedTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis());
}
}
}
1.構造:
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and default access policy.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];//全域性變數,一個Object[]陣列用來維護入隊元素
lock = new ReentrantLock(fair);//ReentrantLock.Condition實現等待\通知
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2.入佇列。生產者生產訊息並放入佇列
public void put(E e) throws InterruptedException {
checkNotNull(e);//入隊元素正確性判斷
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//獲取鎖
try {
while (count == items.length)//如果佇列中資料已經達到佇列上限
notFull.await();//阻塞並釋放鎖(此時當前執行緒進入Condition佇列併產生park阻塞)
enqueue(e);//當佇列中有空位存在的時,執行入隊
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;//putIndex初始化為0,每次插入元素後遞增
if (++putIndex == items.length)//達到上限
putIndex = 0;
count++;//Number of elements in the queue
//通知阻塞在佇列上的消費者(AQS:在獲取到鎖的情況下,將阻塞在Condition佇列的結點放入sync佇列中,等待被喚醒再次嘗試鎖獲取)
notEmpty.signal();
}
3.出佇列。消費者如果阻塞會被喚醒,並且進行鎖獲取和取佇列元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)//如果是個空佇列
notEmpty.await();//阻塞直到佇列進入元素同時釋放鎖
return dequeue();
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];//陣列中取數
items[takeIndex] = null;//取數後釋放佔用
if (++takeIndex == items.length)
takeIndex = 0;
count--;//佇列中總元素數目減1
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//喚醒阻塞的等待消費的執行緒
return x;
}
LinkedBlockingQueue
LinkedBlockingQueue是一個有界阻塞佇列,基於連結串列結構實現,預設capacity為Integer.MAX_VALUE。
我們通過一個簡單的應用,來逐步分析LinkedBlockingQueue佇列的程式碼:
public class LinkedBlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService ex = Executors.newFixedThreadPool(50);
LinkedBlockingQueue<CustomizedTask> tasksQueue = new LinkedBlockingQueue<CustomizedTask>(100);
// 生產者執行緒
new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
tasksQueue.put(new CustomizedTask());
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
// 消費者執行緒
new Thread(new Runnable() {
@Override
public void run() {
CustomizedTask task;
try {
while ((task = tasksQueue.take()) != null && !Thread.currentThread().isInterrupted()) {
ex.submit(task);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
System.out.println("Main Thread is terminated");
}
static class CustomizedTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis());
}
}
}
1.初始化構造:
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater than
* zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);//構造連結串列的頭尾結點,連結串列的初始化
}
1.1 連結串列資料結構
/**
* Linked list node class
* 一個簡單的單向連結串列
*/
static class Node<E> {
E item;
/**
* One of: - the real successor Node - this Node, meaning the successor
* is head.next - null, meaning there is no successor (this is the last
* node)
*/
Node<E> next;
Node(E x) {
item = x;
}
}
2.入佇列。生產者生產訊息並放入佇列
public void put(E e) throws InterruptedException {
if (e == null)
throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);//插入的物件包裝為一個結點
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//獲取putLcok
try {
/*
* Note that count is used in wait guard even though it is not
* protected by lock. This works because count can only decrease at
* this point (all other puts are shut out by lock), and we (or some
* other waiting put) are signalled if it ever changes from
* capacity. Similarly for all other uses of count in other wait
* guards.
*/
while (count.get() == capacity) {//佇列內元素達到上限
notFull.await();//condition等待
}
enqueue(node);//在佇列不滿的情況下 插入元素
c = count.getAndIncrement();//容量計數
if (c + 1 < capacity)//佇列是否可以再插入一個元素
notFull.signal();//喚醒在 putLock.condition等待的執行緒,執行緒執行插入操作。
} finally {
putLock.unlock();
}
if (c == 0)//如果佇列再進入這個操作之前是空的,那麼現在不空了(剛插入一個元素),喚醒因為佇列空而阻塞的取數執行緒
signalNotEmpty();
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;//尾部插入一個元素,並且把last引用指向這個元素
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
3.出佇列。消費者如果阻塞會被喚醒,並且進行鎖獲取和取佇列元素
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {//佇列為空,則阻塞取操作直到佇列不空
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)//如果進入這個操作之前佇列中元素超過1個(比如2個),則表示這個操作取數後依舊不為空(起碼還有1個),那麼可以喚醒其他因為佇列為空而阻塞的執行緒
notEmpty.signal();
} finally {
takeLock.unlock();
}
//喚醒這個操作執行之前因為佇列慢而產生的阻塞,起碼這個操作之後會有一個空位
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;//head的下個元素。可以看到是按照 FIFO佇列排序獲取的
//將這個元素從佇列中清除(出隊)
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
DelayedQueue
一個無界的阻塞佇列,其中的元素需要是先Delayed介面,對元素的提取加入了延期限制
當元素的過期時間到了才允許從佇列中取出。佇列頭部的元素是等待時間最久的元素。
如果插入資料增加會自動擴容,建立新的更大的陣列並將原陣列資料放入(PriorityQueue)。
如果沒有元素到了過期時間,那麼佇列頭head不存在,並且poll操作返回null。
當一個元素到了過期時間,那麼它的getDelay(TimeUnit.NANOSECONDS)方法將會返回一個小於0的數字。佇列中不允許放入null元素。
還是用一個Demo來入手原始碼的分析:
public class DelayQueueTest {
public static void main(String[] args) {
DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();
producer(delayQueue);
consumer(delayQueue);// Consumer 1
consumer(delayQueue);// Consumer 2
}
/**
* 每100毫秒建立一個物件,放入延遲佇列,延遲時間1毫秒
* @param delayQueue
*/
private static void producer(final DelayQueue<DelayedElement> delayQueue) {
// offer
new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (true) {
i++;
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
DelayedElement element = new DelayedElement(1000 * 60 * 2, "test" + i);// 2min
System.out.println("offer success " + delayQueue.offer(element));
}
}
},"Producer").start();
/**
* 每秒列印延遲佇列中的物件個數
*/
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("delayQueue size:" + delayQueue.size());
}
}
},"Watcher").start();
}
/**
* take
*
* 消費者,從延遲佇列中獲得資料,進行處理
* @param delayQueue
*/
private static void consumer(final DelayQueue<DelayedElement> delayQueue) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
DelayedElement element = null;
try {
element = delayQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + "---" + element);
}
}
},"Consumer").start();
}
}
class DelayedElement implements Delayed {
private final long delay; // 延遲時間
private final long expire; // 到期時間
private final String msg; // 資料
private final long now; // 建立時間
public DelayedElement(long delay, String msg) {
this.delay = delay;
this.msg = msg;
expire = System.currentTimeMillis() + delay; // 到期時間 = 當前時間+延遲時間
now = System.currentTimeMillis();
}
/**
* 需要實現的介面,獲得延遲時間 用過期時間-當前時間
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 用於延遲佇列內部比較排序 當前時間的延遲時間 - 比較物件的延遲時間
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DelayedElement{");
sb.append("delay=").append(delay);
sb.append(", expire=").append(expire);
sb.append(", msg='").append(msg).append('\'');
sb.append(", now=").append(now);
sb.append('}');
return sb.toString();
}
}
1.構造初始化DelayedQ
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();//內部通過一個PriorityQueue儲存元素,而PriorityQueue內部通過陣列實現。這個priority會自動通過移動陣列元素進行擴容,類似ArrayList
private final Condition available = lock.newCondition();//同樣是通過condition實現
public DelayQueue() {
}
/**
* 執行緒被設計來用來等待佇列頭部的元素
*
* 這是 leader-follower模式的變體,為了最大限度減小不必要的時間等待
* 當一個執行緒成為 leader,它會等待直到頭結點過期,而其他執行緒會無限期的等待下去,直到這個leader被釋放並喚醒其他執行緒。
* leader 執行緒必須在從take()或者poll()等其他方法中返回前,通知啟用其他執行緒,並釋放leader引用
*
* 無論什麼時候頭結點被替換了一個更早過期的時間。
* 這個leader field 通過設定為null,被置為無效。
* 其他執行緒被喚醒然後準備獲取到接著釋放leadship。
*
*/
private Thread leader = null;
2.offer插入元素
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);//隊尾插入
if (q.peek() == e) {//佇列中僅有一個元素
leader = null;
available.signal();//可能存在其他執行緒因為佇列控而阻塞
}
return true;
} finally {
lock.unlock();
}
}
3.take提取陣列元素
/**
* Retrieves and removes the head of this queue, waiting if necessary until
* an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();//檢視佇列中的頭元素
if (first == null)//為null表示沒有可獲取的元素
available.await();//condition await
else {
long delay = first.getDelay(NANOSECONDS);//檢視這個元資料的過期時間
if (delay <= 0)//已過期 可獲取
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();//如果不是leader則進入等待狀態,直到之前的leader被釋放後被喚醒
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;//當前獲取佇列元素的執行緒
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;//執行緒獲取到元素後釋放leader引用
}
}
}
}
} finally {
if (leader == null && q.peek() != null)//leader已被釋放 && 下個結點存在
available.signal();//leader執行緒獲取了元素 並且釋放了leader引用,退出方法前喚醒其他執行緒。
lock.unlock();
}
}
小結
SynchronousQueue:
這個佇列不儲存元素,當一個執行緒向這個佇列插入一個元素,另一個佇列需要立刻從這個佇列裡取出,否則無法繼續插入元素。適合傳遞型場景。
LinkedTransferQueue:
一個由連結串列構成的無界阻塞佇列
LinkedBlockingDeque:
一個連結串列結構的 雙向阻塞佇列。可以滿足兩個執行緒分別從頭尾進行插入或移除操作,應用於“工作竊取”演算法:允許一個執行緒從頭部插入\移除元素,另一個竊取執行緒從尾部竊取元素。