執行緒基礎(二十二)-併發容器-ArrayBlockingQueue(下)
本文作者:王一飛,叩丁狼高階講師。原創文章,轉載請註明出處。
概念
ArrayBlockingQueue 是一個有界阻塞的佇列。有界原因是它底層維護了一個數組,初始化時,可以直接指定。要注意,一旦建立成功後,陣列將無法進行再擴容。而阻塞是因為它對入列出列做了加鎖處理,如果佇列滿了,再入列則需要阻塞等待, 如果佇列是空的,出列時也需要阻塞等待。
ArrayBlockingQueue 底層是一個有界陣列,遵循FIFO原則,對進入的元素進行排序,先進先出。
ArrayBlockingQueue 使用ReentrantLock鎖,再配合兩種Condition實現佇列的執行緒安全操作。併發環境下ArrayBlockingQueue 使用頻率較高
ArrayBlockingQueue 支援公平與非公平2種操作策略,在建立物件時通過建構函式將fair引數設定為true/false即可,需要注意的是,如果fair設定為false,表示持有公平鎖,這種操作會降低系統吞吐量,慎用。
外部結構
內部結構
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items; //存放元素陣列 final ReentrantLock lock; //互斥鎖物件 private final Condition notEmpty; //非空條件變數 private final Condition notFull; //非滿條件變數 .... }
從內部結構原始碼上看,ArrayBlockingQueue 內部維護一個final陣列,當佇列初始化後將無法再進行拓展,保證佇列的有界性。lock 互斥鎖,在出隊入隊中保證執行緒的安全。而notEmpty 跟 notFull 條件變數保證佇列在滿隊時入隊等待, 當佇列空列時,出隊等待。
初始化
//引數1:佇列初始長度 //引數2:是否為公平佇列 fasle: 是, true 不是 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//引數3:佇列初始化元素
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
.....
}
ArrayBlockingQueue 有3個構造器,核心是2個引數的構造器, capacity表示佇列初始化長度, fair 指定ArrayBlockingQueue是公平佇列還是非公平佇列。
入列
ArrayBlockingQueue 入列方式有大體三種:
public class App {
public static void main(String[] args) throws InterruptedException {
//佇列長度為2
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
//方式1:滿列拋異常
///System.out.println(queue.add("add")); //true
///System.out.println(queue.add("add")); //true
///System.out.println(queue.add("add")); //滿列異常
//方式2:滿列返回false,不阻塞
//System.out.println(queue.offer("offer")); //true
//System.out.println(queue.offer("offer")); //true
//System.out.println(queue.offer("offer")); //false
//方式3:滿列阻塞(推薦)
queue.put("put");
queue.put("put");
queue.put("put"); //滿列阻塞等待
}
}
這裡我們以put方法為例
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //取鎖: 執行緒執行中斷
try {
while (count == items.length)
notFull.await(); //佇列滿隊,需要暫停等待
enqueue(e); //入列
} finally {
lock.unlock(); //釋放鎖
}
}
在put方法開始前, 先獲取可中斷lock.lockInterruptibly(), 對put核心邏輯進行加鎖,當判斷到佇列已滿,阻塞當前執行緒。反之, 執行enqueue()實現入列邏輯。
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x; //入列
//putIndex 表示下一個入列所以, 如果為佇列長度, 下一個輪迴
//原因: 佇列為陣列, 操作所以從0開始
if (++putIndex == items.length)
putIndex = 0;
count++; //總數+1
notEmpty.signal(); //喚醒等待出列執行緒
}
進入enqueue之後, 因為該方法已經持有鎖,所以無法再進行鎖重入,在enqueue方法之後, 執行notEmpty.signal(); 喚醒出列等待執行緒。
出列
ArrayBlockingQueue 出列也對應的有3中方式
public class App {
public static void main(String[] args) throws InterruptedException {
//佇列長度為2
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
queue.put("admin");
queue.put("admin");
//方式1:空列出隊時,拋異常
//System.out.println(queue.remove());
//System.out.println(queue.remove());
//System.out.println(queue.remove()); //空列報異常
//方式2:空列出隊時,返回null
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll()); //空列返回null
//方式3:空列出隊時,阻塞(推薦)
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take()); //空列阻塞
}
}
這裡我們以take方法為例
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 隊長為0,需要暫停等待
return dequeue();
} finally {
lock.unlock();
}
}
跟put方法操作一樣, 進入方法之後, 先獲取鎖,再判斷佇列長度是否為0, 如果為0, 當前執行緒進入阻塞。反之,進入dequeue 方法執行出列操作。
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null; //出列之後,原先佇列設定為null
//takeIndex 下一個出列的資料索引, 一個輪迴後,設定為0
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //喚醒等待入列執行緒
return x;
}
公平/非公平佇列
ArrayBlockingQueue 可以實現公平與非公平2種佇列, 公平隊列表示在併發環境下,如果佇列已經滿列了,入列執行緒按照FIFO的順序阻塞,等待召喚。非公平佇列就沒有這種規矩,誰先搶到,誰先入列。
來看一下例子:
需求:開啟10個執行緒往邊界為3的佇列新增資料, 同時開始一個執行緒不斷出列。
public class App {
public static void main(String[] args) throws InterruptedException {
//佇列長度為3
//公平佇列
ArrayBlockingQueue queue = new ArrayBlockingQueue(3, true);
for (int i= 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 10)); //將問題放大
//執行緒進入
System.out.println("進入-"+ Thread.currentThread().getName());
//阻塞等待入列
queue.put("出列-" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t_" + i).start();
}
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
//按順序出列
System.out.println("------" + queue.take());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}
}
進入-t_5
進入-t_1
------出列-t_5
------出列-t_1
進入-t_8
------出列-t_8
進入-t_7
------出列-t_7
進入-t_2
------出列-t_2
進入-t_9
------出列-t_9
進入-t_0
------出列-t_0
進入-t_3
進入-t_6
------出列-t_3
------出列-t_6
進入-t_4
------出列-t_4
觀察結果,發現進入順序跟出列順序一樣。公平佇列講究公平, 進入0到9執行緒啟動後,執行run方法,都能執行 “進入” 程式碼,但是入列的操作是阻塞的,同一時間點只允許一個執行緒進入。其他執行緒必須等待,那麼誰先列印 “進入” 程式碼,就表示誰先阻塞,依照公平FIFO原則,就應該誰先出列。 所以當進入順序與出列一致就把表示公平原則生效。
將引數改為false,我們再看列印結果
ArrayBlockingQueue queue = new ArrayBlockingQueue(3, false);
進入-t_3
進入-t_7
進入-t_6
進入-t_0
進入-t_4
進入-t_8
進入-t_5
進入-t_1
------出列-t_3
------出列-t_7
------出列-t_6
------出列-t_8
------出列-t_4
------出列-t_5
------出列-t_1
------出列-t_0
進入-t_9
------出列-t_9
進入-t_2
------出列-t_2
觀察, 很明顯進入與出列順序不一致,這就是非公平佇列。
注意: 10個執行緒效果不是太明顯,可以適當加大。
到這,本篇結束。
想獲取更多技術視訊,請前往叩丁狼官網:http://www.wolfcode.cn/all_article.html