1. 程式人生 > >Java併發程式設計:阻塞佇列BlockingQueue

Java併發程式設計:阻塞佇列BlockingQueue

阻塞佇列BlockingQueue簡介

阻塞佇列BlockingQueue是JDK1.5併發新特性中的內容,阻塞佇列首先是一個佇列,同樣實現了Collection介面。阻塞佇列提供了可阻塞的put和take方法,以及支援定時的poll和offer方法。

阻塞佇列跟普通佇列相比,首頁它是執行緒安全的,另外還提供了兩個附加操作:當佇列為空時,從佇列中獲取元素的操作將被阻塞;當佇列填滿是,向佇列新增元素將被阻塞。這兩個附加操作分別由BlockingQueue提供的兩個take和put方法支援。如果佇列已經滿了,那麼put方法將被阻塞直到有空間可用;如果佇列為空,那麼take方法將被阻塞直到有元素可用。佇列可以是有界的也可以是無界的,無界佇列永遠不會充滿,因此在無界佇列上面put方法也永遠不會被阻塞。

BlockingQueue提供了4中型別的處理方法:

方法\處理方式 丟擲異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 丟擲異常: 當佇列滿時,再向佇列中插入元素,則會丟擲IllegalStateException異常。當佇列空時,再向佇列中獲取元素,則會丟擲NoSuchElementException異常。
  • 返回特殊值: 當佇列滿時,向佇列中新增元素,則返回false,否則返回true。當佇列為空時,向佇列中獲取元素,則返回null,否則返回元素。
  • 一直阻塞: 當阻塞佇列滿時,如果生產者向佇列中插入元素,則佇列會一直阻塞當前執行緒,直到佇列可用或響應中斷退出。當阻塞佇列為空時,如果消費者執行緒向阻塞佇列中獲取資料,則佇列會一直阻塞當前執行緒,直到佇列空閒或響應中斷退出。
  • 超時退出: 當佇列滿時,如果生產執行緒向佇列中新增元素,則佇列會阻塞生產執行緒一段時間,超過指定的時間則退出返回false。當佇列為空時,消費執行緒從佇列中移除元素,則佇列會阻塞一段時間,如果超過指定時間退出返回null。

Java提供的7個阻塞佇列

佇列 有界性 資料結構
ArrayBlockingQueue 有界 加鎖 arraylist
LinkedBlockingQueue 可選有界 加鎖 單向linkedlist
PriorityBlockingQueue 無界 加鎖 Heap
DelayQueue 無界 加鎖 Heap
SynchronousQueue 有界 無鎖(JDK1.6) ~
LinkedTransferQueue 無界 無鎖 單向linkedlist
LinkedBlockingDeque 可選有界 加鎖 雙向linkedlist

在多執行緒環境中,通過佇列可以很容易的實現資料共享。在基於佇列的生產者-消費者模型中,資料生產時,生產者就把資料放入佇列,當消費者準備使用資料時就從佇列中取出資料。生產者不需要知道消費者的標識或者數量,或者他們是唯一的生產者。同樣,消費者也不需要知道生產者來自何處。BlockingQqueue簡化了生產者-消費者的過程,它支援任意數量的生產者-消費者。一種最常見的生產者-消費者模式就是執行緒池與工作佇列的組合,在Executor執行框架中就體現了這種模式。

阻塞佇列BlockingQueue的成員介紹

ArrayBlockingQueue

ArrayBlockingQueue是一個基於陣列的阻塞佇列實現,內部維護了一個定長陣列,以便快取資料。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致放入操作受阻塞;試圖從空佇列中檢索元素將導致類似阻塞。ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。

  • ArrayBlockingQueue(int capacity) 建立一個帶有給定的(固定)容量和預設訪問策略(非公平鎖)的 ArrayBlockingQueue。capacity是佇列容量。
  • ArrayBlockingQueue(int capacity, boolean fair) 建立一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。fair訪問策略如果為 true,則按照 FIFO 順序訪問插入或移除時受阻塞執行緒的佇列,如果為 false,則訪問順序是不確定的。fair是“可重入的獨佔鎖(ReentrantLock)”的型別。fair為true,表示是公平鎖,fair為false,表示是非公平鎖。
  • ArrayBlockingQueue(int capacity, boolean fair, Collectionc) 建立一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,並以 collection 迭代器的遍歷順序新增元素。

由於ArrayBlockingQueue內部只維護一個ReentrantLock型別的lock鎖物件,所以在生成者-消費者模型中,並不能真正的實現並行,這一點不同於LinkedBlockingQueue,LinkedBlockingQueue內部維護了兩個鎖。事實上ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。

ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。

LinkedBlockingQueue

LinkedBlockingQueue是一個單向連結串列實現的阻塞佇列,支援真正的並行操作,因為內部使用ReentrantLock實現插入鎖(putLock)和取出鎖(takeLock),維護了兩個所物件。其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。

在開發中新建LinkedBlockingQueue例項的時候,一般要指定其大小,如果沒有指定大小,大小預設是Integer.MAX_VALUE,這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。線上程池框架Executors中newSingleThreadExecutor和newFixedThreadPool方法內部維護的都是LinkedBlockingQueue。

PriorityBlockingQueue

PriorityBlockingQueue是一個按照優先順序排序的佇列,如果想要某個佇列不是按照FIFO的順序來處理元素,該佇列非常有用,內部維護一個堆的資料結構。PriorityBlockingQueue既可以根據元素的自然順序進行排序,如果元素實現了Comparable介面,也可以根據Comparator進行比較。該佇列看似有界佇列,實際上它會自動擴容,因此是無界佇列,因此在生產者-消費者模型中,生產者並不會真正的阻塞,而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是非公平鎖。

DelayQueue

DelayQueue是一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部 是延遲期滿後儲存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且 poll 將返回null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即使無法使用 take 或 poll移除未到期的元素,也不會將這些元素作為正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此佇列不允許使用 null 元素。

DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連線佇列。

SynchronousQueue

SynchronousQueue是這樣一種阻塞佇列,其中每個 put 必須等待一個take,反之亦然。同步佇列沒有任何內部容量,甚至連一個佇列的容量都沒有,它不會為佇列中的元素維護儲存空間。與其它佇列不同的是,它維護一組執行緒,這些執行緒在等待著元素加入或者移除佇列。不能在同步佇列上進行peek,因為僅在試圖要取得元素時,該元素才存在;除非另一個執行緒試圖移除某個元素,否則也不能(使用任何方法)新增元素;也不能迭代佇列,因為其中沒有元素可用於迭代。佇列的頭是嘗試新增到佇列中的首個已排隊執行緒元素; 如果沒有已排隊執行緒,則不新增元素並且頭為null。SynchronousQueue類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那麼對不起,大家都在集市等待。

SynchronousQueue的一個使用場景是線上程池裡。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個執行緒池根據需要(新任務到來時)建立新的執行緒,如果有空閒執行緒則會重複使用,執行緒空閒了60秒後會被回收。

建立SynchronousQueue有兩種構造方法,一種時SynchronousQueue(),預設採用非公平的形式,從JDK1.6開始SynchronousQueue的實現採用了一種效能更好的無鎖演算法。競爭機制支援公平和非公平兩種:非公平競爭模式使用的資料結構是後進先出棧(LIFO Stack);公平競爭模式則使用先進先出佇列(FIFO),效能上兩者是相當的,一般情況下,FIFO通常可以支援更大的吞吐量,但LIFO可以更大程度的保持執行緒的本地化。另外一種SynchronousQueue(boolean fair),可以自己指定訪問方式是否採用公平方式。

LinkedTransferQueue

LinkedTransferQueue是JDK1.7中新引入的佇列,該佇列的實現基於CAS無鎖機制,它也是一個基於連結串列實現的無界佇列。相比前面佇列它多transfer和tryTransfer方法。

LinkedBlockingDeque

LinkedBlockingDeque一個基於已連結節點的、任選範圍的阻塞雙端佇列。可選的容量範圍構造方法引數是一種防止過度膨脹的方式。如果未指定容量,那麼容量將等於 Integer.MAX_VALUE。只要插入元素不會使雙端佇列超出容量,每次插入後都將動態地建立連結節點。要想支援阻塞功能,佇列的容量一定是固定的,否則無法在入隊的時候掛起執行緒。也就是capacity是final型別的。

阻塞佇列示例

這是一個使用LinkedBlockedQueue設計實現的簡單的生產者-消費者模式。

public class Producer implements Runnable {
 
 private volatile boolean isRunning = true;
 private BlockingQueue<String> queue;
 private static final int DEFAULT_SLEEP = 1000;
 private static AtomicInteger count = new AtomicInteger();
 
 public Producer(BlockingQueue<String> queue) {
 this.queue = queue;
 }
 
 @Override
 public void run() {
 String data = null;
 Random r = new Random();
 
 System.out.println("啟動生產者執行緒!");
 try {
 while (isRunning) {
 Thread.sleep(r.nextInt(DEFAULT_SLEEP));
 data = "data:" + count.incrementAndGet();
 queue.put(data);
 System.out.println("將資料:" + data + "放入佇列...");
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 Thread.currentThread().interrupt();
 } finally {
 System.out.println("退出生產者執行緒!");
 }
 }
 
 public void stop() {
 isRunning = false;
 }
 
}
public class Consumer implements Runnable {
 
 private volatile boolean isRunning = true;
 private BlockingQueue<String> queue;
 private static final int DEFAULT_SLEEP = 1000;
 
 public Consumer(BlockingQueue<String> queue) {
 this.queue = queue;
 }
 
 @Override
 public void run() {
 System.out.println("啟動消費者執行緒!");
 Random r = new Random();
 try {
 while (isRunning) {
 String data = queue.take();
 if (null != data) {
 System.out.println("正在消費:" + data);
 Thread.sleep(r.nextInt(DEFAULT_SLEEP));
 }
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 Thread.currentThread().interrupt();
 } finally {
 System.out.println("退出消費者執行緒!");
 }
 }
 
 public void stop() {
 isRunning = false;
 }
 
}
public class MainTest {
 
 public static void main(String[] args) throws InterruptedException {
 // 宣告一個容量為10的快取佇列
 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
 
 Producer producer1 = new Producer(queue);
 Producer producer2 = new Producer(queue);
 Producer producer3 = new Producer(queue);
 Consumer consumer = new Consumer(queue);
 
 // 藉助Executors
 ExecutorService service = Executors.newCachedThreadPool();
 // 啟動執行緒
 service.execute(producer1);
 service.execute(producer2);
 service.execute(producer3);
 service.execute(consumer);
 
 // 執行20s
 Thread.sleep(20 * 1000);
 producer1.stop();
 producer2.stop();
 producer3.stop();
 
 Thread.sleep(2 * 1000);
 consumer.stop();
 // 退出Executor
 service.shutdown();
 }
 
}

如果不使用阻塞佇列,使用Object.wait()和Object.notify()非阻塞佇列實現生產者-消費者模式,生產者執行緒在緩衝區為滿的時候,消費者在緩衝區為空的時候,都應該暫停執行。然後用notify 和notifyAll通知等待中的執行緒重新開始執行。

相關推薦

Java併發程式設計阻塞佇列BlockingQueue

阻塞佇列BlockingQueue簡介 阻塞佇列BlockingQueue是JDK1.5併發新特性中的內容,阻塞佇列首先是一個佇列,同樣實現了Collection介面。阻塞佇列提供了可阻塞的put和take方法,以及支援定時的poll和offer方法。 阻塞佇列跟普通

Java併發(十八):阻塞佇列BlockingQueue BlockingQueue阻塞佇列)詳解 二叉堆(一)之 圖文解析 和 C語言的實現 多執行緒程式設計:阻塞併發佇列的使用總結 Java併發程式設計:阻塞佇列 java阻塞佇列 BlockingQueue阻塞佇列)詳解

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。 這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。 阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者

JAVA併發程式設計阻塞佇列-ArrayBlockingQueue

生活 有很多的不快樂,其實是源自不滿足,而不滿足,很多時候是源自於心不定,而心不定則是因為不清楚究竟自己要什麼,不清楚要什麼的結果就是什麼都想要,結果什麼都沒得到。 生產者消費者模式 生產者和消費者問題是執行緒模型中一個經典問題: 生產者和消費者在同一個時間段內共用一塊記憶體區域

JAVA併發程式設計阻塞佇列-DelayQueue

生活 如果第一次你沒有成功,那麼稱之為1.0版,繼續加油。 DelayQueue的成員組成 今天來學習延時佇列,這個玩意兒也是非常重要,在定時器上有用到。 首先簡單瞭解下,延時佇列就是讓指定的資料再指定的時候以後出隊,也就是按照時間排序,因此它的核心確實是使用了昨天看的優先佇列。

12-Java併發程式設計阻塞佇列

Java併發程式設計:阻塞佇列   在前面幾篇文章中,我們討論了同步容器(Hashtable、Vector),也討論了併發容器(ConcurrentHashMap、CopyOnWriteArrayList),這些工具都為我們編寫多執行緒程式提供了很大的方便。今天我們來討

Java併發程式設計阻塞佇列和CountDownLatch

前幾天看到一個面試題目:有一個長度為2000的字串,開三個執行緒去判斷字串中”u51”的個數。 當時看到這個題目的時候,對併發程式設計是沒有什麼經驗的,在實際專案多執行緒的應用也只有一兩次。最近在惡補《Java併發程式設計的藝術》,對這個題目就有了解題的思路了。在這裡記錄一下對該題的

java併發-特大疑問-阻塞佇列(BlockingQueue)

java併發-阻塞佇列(BlockingQueue) 何為阻塞佇列 A {@link java.util.Queue} that additionally supports operations that wait for the queue to become non-

Java併發6阻塞佇列,Fork/Join框架

阻塞佇列 阻塞佇列是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞的插入和移除方法: 支援阻塞的插入方法:佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿 支援阻塞的移除方法:佇列空時,獲取元素的執行緒會等待佇列變為非空 阻塞佇列常用於生產者消費者的場景。其中生產者是向佇列新增元素

Java併發(二十一)執行緒池實現原理 Java併發(十八)阻塞佇列BlockingQueue Java併發(十八)阻塞佇列BlockingQueue Java併發程式設計執行緒池的使用

一、總覽 執行緒池類ThreadPoolExecutor的相關類需要先了解:  (圖片來自:https://javadoop.com/post/java-thread-pool#%E6%80%BB%E8%A7%88) Executor:位於最頂層,只有一個 execute(Runnab

Java併發程式設計4種執行緒池和緩衝佇列BlockingQueue

一. 執行緒池簡介 1. 執行緒池的概念:           執行緒池就是首先建立一些執行緒,它們的集合稱為執行緒池。使用執行緒池可以很好地提高效能,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個任務傳給執行緒池,執行緒池就會啟動一

JAVA併發程式設計 PriorityQueue -》阻塞佇列 PriorityBlockingQueue

生活 一旦一種新技術開始滾動碾壓道路,如果你不能成為壓路機的一部分,那麼你就只能成為道路的一部分。 PriorityQueue 阻塞佇列裡的PriorityBlockingQueue基於PriorityQueue,所以在研究PriorityBlockingQueue之前要先研究一

Java併發(十八)阻塞佇列BlockingQueue

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。 這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。 阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放

Java併發包原始碼學習系列阻塞佇列BlockingQueue及實現原理分析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源

Java併發包阻塞佇列BlockingQueue

BlockingQueue 在java.util.concurrent包中的 BlockingQueue介面類是一種執行緒安全的佇列。這篇文章我們將展示如何使用BlockingQueue。 這篇文章不討論BlockQueue的實現。如果你對此感興趣,有一片理論

java執行緒學習(九)阻塞佇列BlockingQueue講解

上一章中學到了執行緒池的詳細使用以及核心執行緒池的部分原始碼,其中就包含有BlockingQueue的資訊,那麼到底BlockingQueue是什麼呢,有什麼用呢,本章就是學這個的。 Blocking翻譯過來為’阻塞’,Queue就是佇列的意思,那麼BlockingQueue就是阻塞隊列了,

Java多執行緒/併發26、阻塞佇列BlockingQueue

BlockingQueue介面定義了一種佇列,這種佇列通常容量是提前固定(確定了容量大小)的。容量滿時往BlockingQueue中新增資料時會造成阻塞,容量為空時取元素操作會阻塞。 我們可以認為BlockingQueue佇列是一個水庫。水庫滿了的時侯,上游的

【轉】Java併發程式設計同步容器

  為了方便編寫出執行緒安全的程式,Java裡面提供了一些執行緒安全類和併發工具,比如:同步容器、併發容器、阻塞佇列、Synchronizer(比如CountDownLatch)。今天我們就來討論下同步容器。   一、為什麼會出現同步容器?   在Java的集合容器框架中,主要有四大類別:Li

JAVA併發程式設計一文全面搞懂併發程式設計

序言 哈哈哈哈哈哈,原諒我這個標題黨哈,我現在也只是剛入門併發程式設計,學習的過程過程中發現好多專業詞語不會讀或者是讀不準。。所以就彙總了下,把一些比較難讀的給標上英標啦。。 正文 callable:['kɔ:ləbl]  一個類似runnable的介面,方法可以有返回值

Java併發程式設計自己動手寫一把可重入鎖

關於執行緒安全的例子,我前面的文章Java併發程式設計:執行緒安全和ThreadLocal裡面提到了,簡而言之就是多個執行緒在同時訪問和修改公共資源的時候,由於不同執行緒搶佔CPU問題而導致的結果不確定性,就是在併發程式設計中經常要考慮的執行緒安全問題。前面的做法是使用同步語句synch

Java併發程式設計用AQS寫一把可重入鎖

前一篇部落格Java併發程式設計:自己動手寫一把可重入鎖詳述瞭如何用synchronized同步的方式來實現一把可重入鎖,今天我們來效仿ReentrantLock類用AQS來改寫一下這把鎖。要想使用AQS為我們服務,首先得弄懂三個問題:AQS是什麼?AQS已經做了什麼以及我們還需要做些什