Java多執行緒/併發26、阻塞佇列BlockingQueue
BlockingQueue介面定義了一種佇列,這種佇列通常容量是提前固定(確定了容量大小)的。容量滿時往BlockingQueue中新增資料時會造成阻塞,容量為空時取元素操作會阻塞。
我們可以認為BlockingQueue佇列是一個水庫。水庫滿了的時侯,上游的水就要被攔住,不能再往水庫裡灌了。平時農莊澆灌,生活飲用都用這裡面的水。如果水庫幹了,那麼要灌溉的工人只能等著上游放水後,才能繼續工作。
實際上我們已經用Condition實現過一次了,見《利用Condition來實現阻塞佇列》。
BlockingQueue多用於生產-消費模式。還記得Future兌換月餅的例子嗎?裡面用到的CompletionService的實現用的就是一個儲存Future物件的BlockingQueue
在JUC中,BlockingQueue介面有以下實現:
ArrayBlockingQueue基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數putIndex和takeIndex,分別標識著佇列的頭部和尾部在陣列中的位置。
我們用ArrayBlockingQueue改寫一下領月餅的程式:
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
final BlockingQueue<Integer> abqueue=new ArrayBlockingQueue<Integer>(10);
/* 定義生產者:用來做月餅的Callable */
final Runnable runnable = new Runnable() {
public void run() {
try {
/*模擬耗時操作,需要5秒*/
Thread.sleep(5000 );
/*在佇列中放上一盒做好的月餅編號*/
abqueue.put(new Random().nextInt(10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/*開啟執行緒B--消費者:獲取月餅*/
Runnable constmor_runnable=new Runnable() {
public void run() {
try {
ExecutorService tPool = Executors.newCachedThreadPool();
System.out.println("老闆,給我開始做月餅...");
/*啟動執行緒A--生產者:執行耗時操作,三條生產線開始生產月餅*/
for(int i=0;i<3;i++){
tPool.submit(runnable);
}
/*拿月餅*/
System.out.println("5秒鐘後......");
for(int i=0;i<3;i++){
/*生產月餅需要5秒,這時執行緒執行到take()這裡,還沒有月餅,因此就在這裡阻塞等待。*/
System.out.println("用月餅券兌換到月餅,該盒月餅編號:"+abqueue.take());
}
System.out.println("拿餅回家...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(constmor_runnable).start();
}
}
另外再說說LinkedBlockingQueue:
LinkedBlockingQueue是一個用連結串列實現的有界阻塞佇列。此佇列的預設和最大長度為Integer.MAX_VALUE。
它與ArrayBlockingQueue與使用一樣,上例中可以直接用LinkedBlockingDeque替換
附:
BlockingQueue介面常用方法:
放入資料:
- offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒)
- offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。
- put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.
獲取資料:
- poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null;
- poll(long timeout, TimeUnitunit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。
- take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入;
- drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。