1. 程式人生 > 實用技巧 >JUC中的阻塞佇列

JUC中的阻塞佇列

阻塞佇列的應用場景
阻塞佇列這塊的應用場景,比較多的仍然是對於生產者消 費者場景的應用,但是由於分散式架構的普及,是的大家 更多的關注在分散式訊息佇列上。所以其實如果把阻塞隊 列比作成分散式訊息佇列的話,那麼所謂的生產者和消費 者其實就是基於阻塞佇列的解耦。 另外,阻塞佇列是一個 fifo 的佇列,所以對於希望線上程 級別需要實現對目標服務的順序訪問的場景中,也可以使 用;

在Java8中,提供了7個阻塞佇列:

阻塞佇列的操作方法
在阻塞佇列中,提供了四種處理方式

1. 插入操作

add(e) :新增元素到佇列中,如果佇列滿了,繼續插入 元素會報錯,IllegalStateException。

offer(e) : 新增元素到佇列,同時會返回元素是否插入 成功的狀態,如果成功則返回true
put(e) :當阻塞佇列滿了以後,生產者繼續通過 put 新增元素,佇列會一直阻塞生產者執行緒,知道佇列可用

offer(e,time,unit) :當阻塞佇列滿了以後繼續新增元素, 生產者執行緒會被阻塞指定時間,如果超時,則執行緒直接 退出

2. 移除操作

remove():當佇列為空時,呼叫 remove 會返回 false, 如果元素移除成功,則返回true

poll(): 當佇列中存在元素,則從佇列中取出一個元素, 如果佇列為空,則直接返回null

take():基於阻塞的方式獲取佇列中的元素,如果佇列為 空,則take方法會一直阻塞,直到佇列中有新的資料可 以消費

poll(time,unit):帶超時機制的獲取資料,如果佇列為空, 則會等待指定的時間再去獲取元素返回

ArrayBlockingQueue 原理分析
構造方法
ArrayBlockingQueue提供了三個構造方法,分別如下。 capacity: 表示陣列的長度,也就是佇列的長度
fair:表示是否為公平的阻塞佇列,預設情況下構造的是非 公平的阻塞佇列。 其中第三個構造方法就不解釋了,它提供了接收一個幾個 作為資料初始化的方法;

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);//重入鎖,入隊和入隊持有同一把鎖
notEmpty = lock.newCondition();//初始化非空等待佇列
notFull = lock.newCondition();//初始化非滿等待佇列
}

Add 方法
以add方法作為入口,在add方法中會呼叫父類的add方 法,也就是 AbstractQueue.如果看原始碼看得比較多的話,
一般這種寫法都是呼叫父類的模版方法來解決通用性問題

AbstractQueue.add
//從父類的add方法可以看到,判斷佇列是否滿了,如果佇列滿了,則直接丟擲一個異常

public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer 方法
add方法最終還是呼叫offer方法來新增資料,返回一個添加成功或者失敗的布林值反饋 這段程式碼做了幾個事情
1. 判斷新增的資料是否為空
2. 新增重入鎖
3. 判斷佇列長度,如果佇列長度等於陣列長度,表示滿了 直接返回false
4. 否則,直接呼叫enqueue將元素新增到佇列中

public boolean offer(E e) {
checkNotNull(e);//對請求資料做判斷,若e為null,則丟擲異常
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)//判斷佇列是否滿了
return false;
else {
enqueue(e);//加入佇列
return true;
}
} finally {
lock.unlock();
}
}

enqueue :真正的資料新增操作
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;//通過putIndex對資料賦值
if (++putIndex == items.length)//當putIndex等於陣列長度時,將putIndex重置為0
putIndex = 0;
count++;//記錄佇列元素的個數
notEmpty.signal();//喚醒處於等待狀態下的執行緒,表示當前佇列中的元素不為空,如果存在消費者執行緒阻塞,就可以開始取出元素

}
putIndex為什麼會在等於數 組長度的時候重新設定為0?
因為ArrayBlockingQueue是一個FIFO的佇列,佇列新增 元素時,是從隊尾獲取putIndex來儲存元素,當putIndex 等於陣列長度時,
下次就需要從陣列頭部開始添加了。

下面這個圖模擬了新增到不同長度的元素時,putIndex的 變化,當putIndex等於陣列長度時,不可能讓putIndex繼 續累加,
否則會超出陣列初始化的容量大小。同時大家還 需要思考兩個問題 :
1. 當元素滿了以後是無法繼續新增的,因為會報錯
2. 其次,佇列中的元素肯定會有一個消費者執行緒通過take 或者其他方法來獲取資料,而獲取資料的同時元素也會 從佇列中移除

put方法

put 方法和 add 方法功能一樣,差異是 put 方法如果佇列 滿了,會阻塞。

public void put(E e) throws InterruptedException {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();//這個也是獲得鎖,但
//是和 lock 的區別是,這個方法優先允許在等待時由其他執行緒調
//用等待執行緒的 interrupt 方法來中斷等待直接返回。而 lock
//方法是嘗試獲得鎖成功後才響應中斷
  try {
    while (count == items.length)
    notFull.await();//佇列滿了的情況下,當前執行緒將會被 notFull 條件物件掛起加到等待佇列中,reentrantlock的實現原理
    enqueue(e);
  } finally {
    lock.unlock();
  }
}

take 方法
take方法是一種阻塞獲取佇列中元素的方法 它的實現原理很簡單,有就刪除沒有就阻塞,注意這個阻 塞是可以中斷的,如果佇列沒有資料那麼就加入notEmpty 條件佇列等待(有資料就直接取走,方法結束),如果有新的 put執行緒添加了資料,那麼put操作將會喚醒take執行緒, 執行take操作。

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); //如果佇列為空的情況
下,直接通過 await 方法阻塞
return dequeue();
} finally {
lock.unlock();
} }

如果佇列中添加了元素,那麼這個時候,會在enqueue中 呼叫notempty.signal喚醒take執行緒來獲得元素;

dequeue 方法
這個是出佇列的方法,主要是刪除佇列頭部的元素併發返 回給客戶端
takeIndex,是用來記錄拿資料的索引值

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];//預設獲取0位置的元素
items[takeIndex] = null;//將該位置的元素設定為空
if (++takeIndex == items.length)//這裡的作用是,如果拿到陣列的最大值,那麼重置為0,繼續從頭部位置開始獲取資料
takeIndex = 0;
count--;//記錄元素個數遞減
if (itrs != null)
itrs.elementDequeued();//同時更新迭代器中的元素個數
notFull.signal();//觸發 因為佇列滿了以後導致的被阻塞的執行緒
return x;
}

remove方法
remove方法是移除一個指定元素:
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {//如果佇列不為空
final int putIndex = this.putIndex;//獲取下一個要新增元素時的索引
int i = takeIndex;//獲取當前要被移除的元素索引
do {
if (o.equals(items[i])) {//從takeIndex下標開始,找到要被刪除的元素
removeAt(i);//移除指定元素
return true;
}
          //當前刪除索引執行加1後判斷是否與陣列長度相等
          //若為true,說明索引已到陣列盡頭,將i設定為0

if (++i == items.length)
i = 0;
} while (i != putIndex);//繼續查詢,直到找到最後一個元素
}
return false;
} finally {
lock.unlock();
}
}