1. 程式人生 > >BlockingQueue(阻塞佇列)詳解(一個生產者消費者的例項)

BlockingQueue(阻塞佇列)詳解(一個生產者消費者的例項)


注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。

一. 前言

  在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。

二. 認識BlockingQueue

  阻塞佇列,顧名思義,首先它是一個佇列,而一個佇列在資料結構中所起的作用大致如下圖所示:

  從上圖我們可以很清楚看到,通過一個共享的佇列,可以使得資料由佇列的一端輸入,從另外一端輸出;

  常用的佇列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同型別的佇列,DelayQueue就是其中的一種)

    先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。

    後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。  

多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。然而,在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒),

下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:
       如上圖所示:當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞(掛起),直到有資料放入佇列。


   如上圖所示:當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞(掛起),直到佇列中有空的位置,執行緒被自動喚醒。

  這也是我們在多執行緒環境下,為什麼需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法:

三. BlockingQueue的核心方法

  1.放入資料

    (1)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法

 的執行緒);       
       (2)offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。

    (3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.

  2. 獲取資料

    (1)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null;

    (2)poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間

超時還沒有資料可取,返回失敗。

    (3)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入; 

    (4)drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。

四. 常見BlockingQueue

  在瞭解了BlockingQueue的基本功能後,讓我們來看看BlockingQueue家庭大致有哪些成員?

  1. ArrayBlockingQueue

  基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。

  ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。

  2.LinkedBlockingQueue

  基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。

  作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。

  ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞佇列,一般情況下,在處理多執行緒間的生產者消費者問題,使用這兩個類足以。

  下面的程式碼演示瞭如何使用BlockingQueue:

  (1) 測試類

複製程式碼
 1 import java.util.concurrent.BlockingQueue;
 2 import java.util.concurrent.ExecutorService;
 3 import java.util.concurrent.Executors;
 4 import java.util.concurrent.LinkedBlockingQueue; 
 5 
 6 public class BlockingQueueTest {
 7  
 8     public static void main(String[] args) throws InterruptedException {
 9         // 宣告一個容量為10的快取佇列
10         BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
11  
12         //new了三個生產者和一個消費者
13         Producer producer1 = new Producer(queue);
14         Producer producer2 = new Producer(queue);
15         Producer producer3 = new Producer(queue);
16         Consumer consumer = new Consumer(queue);
17  
18         // 藉助Executors
19         ExecutorService service = Executors.newCachedThreadPool();
20         // 啟動執行緒
21         service.execute(producer1);
22         service.execute(producer2);
23         service.execute(producer3);
24         service.execute(consumer);
25  
26         // 執行10s
27         Thread.sleep(10 * 1000);
28         producer1.stop();
29         producer2.stop();
30         producer3.stop();
31  
32         Thread.sleep(2000);
33         // 退出Executor
34         service.shutdown();
35     }
36 }
複製程式碼

  (2)生產者類

複製程式碼
 1 import java.util.Random;
 2 import java.util.concurrent.BlockingQueue;
 3 import java.util.concurrent.TimeUnit;
 4 import java.util.concurrent.atomic.AtomicInteger;
 5  
 6 /**
 7  * 生產者執行緒
 8  * 
 9  * @author jackyuj
10  */
11 public class Producer implements Runnable {
12     
13     private volatile boolean  isRunning = true;//是否在執行標誌
14     private BlockingQueue queue;//阻塞佇列
15     private static AtomicInteger count = new AtomicInteger();//自動更新的值
16     private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
17  
18     //建構函式
19     public Producer(BlockingQueue queue) {
20         this.queue = queue;
21     }
22  
23     public void run() {
24         String data = null;
25         Random r = new Random();
26  
27         System.out.println("啟動生產者執行緒!");
28         try {
29             while (isRunning) {
30                 System.out.println("正在生產資料...");
31                 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一個隨機數
32  
33                 data = "data:" + count.incrementAndGet();//以原子方式將count當前值加1
34                 System.out.println("將資料:" + data + "放入佇列...");
35                 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//設定的等待時間為2s,如果超過2s還沒加進去返回true
36                     System.out.println("放入資料失敗:" + data);
37                 }
38             }
39         } catch (InterruptedException e) {
40             e.printStackTrace();
41             Thread.currentThread().interrupt();
42         } finally {
43             System.out.println("退出生產者執行緒!");
44         }
45     }
46  
47     public void stop() {
48         isRunning = false;
49     }
50 }
複製程式碼

  (3)消費者類

複製程式碼
 1 import java.util.Random;
 2 import java.util.concurrent.BlockingQueue;
 3 import java.util.concurrent.TimeUnit;
 4  
 5 /**
 6  * 消費者執行緒
 7  * 
 8  * @author jackyuj
 9  */
10 public class Consumer implements Runnable {
11     
12     private BlockingQueue<String> queue;
13     private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
14  
15     //建構函式
16     public Consumer(BlockingQueue<String> queue) {
17         this.queue = queue;
18     }
19  
20     public void run() {
21         System.out.println("啟動消費者執行緒!");
22         Random r = new Random();
23         boolean isRunning = true;
24         try {
25             while (isRunning) {
26                 System.out.println("正從佇列獲取資料...");
27                 String data = queue.poll(2, TimeUnit.SECONDS);//有資料時直接從佇列的隊首取走,無資料時阻塞,在2s內有資料,取走,超過2s還沒資料,返回失敗
28                 if (null != data) {
29                     System.out.println("拿到資料:" + data);
30                     System.out.println("正在消費資料:" + data);
31                     Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
32                 } else {
33                     // 超過2s還沒資料,認為所有生產執行緒都已經退出,自動退出消費執行緒。
34                     isRunning = false;
35                 }
36             }
37         } catch (InterruptedException e) {
38             e.printStackTrace();
39             Thread.currentThread().interrupt();
40         } finally {
41             System.out.println("退出消費者執行緒!");
42         }
43     }
44  
45     
46 }
複製程式碼

  3. DelayQueue

  DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。

  使用場景:

  DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連線佇列。

  4. PriorityBlockingQueue

   基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定),但需要注意的是PriorityBlockingQueue並不會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖。

  5. SynchronousQueue

   一種無緩衝的等待佇列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那麼對不起,大家都在集市等待。相對於有緩衝的BlockingQueue來說,少了一箇中間經銷商的環節(緩衝區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說採用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應效能可能會降低。

  宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:

  如果採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO佇列來阻塞多餘的生產者和消費者,從而體系整體的公平策略;

  但如果是非公平模式(SynchronousQueue預設):SynchronousQueue採用非公平鎖,同時配合一個LIFO佇列來管理多餘的生產者和消費者,而後一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的資料永遠都得不到處理。

五. 小結

  BlockingQueue不光實現了一個完整佇列所具有的基本功能,同時在多執行緒環境下,他還自動管理了多線間的自動等待於喚醒功能,從而使得程式設計師可以忽略這些細節,關注更高階的功能。


相關推薦

BlockingQueue阻塞佇列一個生產者消費者例項

注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。一. 前言  在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳

棧實現佇列,用兩個棧實現佇列方法含實現程式碼

棧怎樣才能實現和佇列一樣從棧的底層抽出元素呢?一般會用兩個棧來實現佇列。 首先,我們將兩個棧分別定義為 stack1 與 stack2。 實現方案 1 我們讓入隊操作在 stack1 中執行,而出隊操作在 stack2 中執行。執行方式如下。 入隊:直接向 stack1 中入棧。 出隊:將 stac

堆 堆排序 優先佇列 圖文Golang實現

引入 在實際應用中,我們經常需要從一組物件中查詢最大值或最小值。當然我們可以每次都先排序,然後再進行查詢,但是這種做法效率很低。哪麼有沒有一種特殊的資料結構,可以高效率的實現我們的需求呢,答案就是堆(heap) 堆分為最小堆和最大堆,它們的性質相似,我們以最小堆為例子。 最小堆 舉例 如上圖所示,就為一個

PCA 主成分分析 寫給初學者 結合matlab轉載

整數 變量 行為 保持 sum osc 入參 函數 data 一、簡介 PCA(Principal Components Analysis)即主成分分析,是圖像處理中經常用到的降維方法,大家知道,我們在處理有關數字圖像處理方面的問題時,比如經常用的圖像的查詢

GPIO輸入輸出各種模式推挽、開漏、準雙向端口

結構 inf 根據 action 存在 -i container 大致 信號線 GPIO輸入輸出各種模式(推挽、開漏、準雙向端口 概述 能將處理器的GPIO(General Purpose Input and Output)內部結構和各種模式徹底弄清

ASP.NET Core微服務 on K8SJessetalk第一章:基本物件及服務發現持續更新

課程連結:http://video.jessetalk.cn/course/explore 良心課程,大家一起來學習哈! 任務1:課程介紹 任務2:Labels and Selectors 所有資源物件(包括Pod, Service, Namespace, Volume)都可以打

JavaScript運算子操作符1----基本運算子

一、一元操作符 1.遞增(++)遞減(–)操作符 遞增遞減操作符都和C語言一樣,分為前置型和後置型。兩者區別在於及時返回值不同; var age = 18; age++; //19 a

LCTLink-Cut Tree蒟蒻自留地

 最近自學了LCT,發現網上的資料講解不是很全面,像我這樣的蒟蒻一時半會根本理解不了。我弄了很久總算是理解了LCT,打算總結一下LCT的基本操作,還請諸位神牛來找找茬。 如果你還沒有接觸過LCT,你

C++之函式物件/偽函式Function Object

       除了自定義的函式物件,標準庫還為我們提供了一系列現成的函式物件, 比如常見的數學、邏輯運算等。例如:negate<type>(),plus<type>(),minus<type>(),multiplies<type&g

Android中的服務service

1. 引言:在Android系統中,到處可見service(服務)這個單詞,從功能上來講,它意味著沒有UI介面,作為一個後臺程序,執行一些特定的任務。在Android應用開發過程中,也免不了需要開發一些service來完成一些功能,而這種應用層的service(繼承看自se

JMeter學習—006—JMeter 命令列非GUI模式-分散式遠端執行指令碼及檢視指定結果、日誌

JMeter分散式執行指令碼,以更好的達到預設的效能測試(併發)場景,前文解說了jmeter使用命令列執行各個引數的作用以及命令列使用範例,那麼此文就繼續前文,針對 JMeter 的命令列模式之分散式遠端執行模式進行詳細解說。一、應用場景 1、無需互動介面或受環境限制(l

資料結構之點陣圖bitmap

1.  概述 點陣圖(bitmap)是一種非常常用的結構,在索引,資料壓縮等方面有廣泛應用。本文介紹了點陣圖的實現方法及其應用場景。 2. 點陣圖實現 (1)自己實現 在點陣圖中,每個元素為“0”或“1”,表示其對應的元素不存在或者存在。 複製程式碼程式碼如

Java併發十八阻塞佇列BlockingQueue BlockingQueue(阻塞佇列)詳解 二叉堆(一)之 圖文解析 和 C語言的實現 多執行緒程式設計:阻塞、併發佇列的使用總結 Java併發程式設計:阻塞佇列 java阻塞佇列 BlockingQueue(阻塞佇列)詳解

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。 這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。 阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者

【轉載】BlockingQueue阻塞佇列

注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。 一. 前言   在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資

BlockingQueue阻塞佇列

注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。 一. 前言   在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高

生產者消費者BlockingQueue阻塞佇列

注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。 一. 前言   在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何

BlockingQueue阻塞隊列

明顯 緩存 thread 生產者消費者 演示 mce 生成 log spa 註意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,復制過來算是個積累,怕以後找不到。 一. 前言   在新增的Concu

Java 進階——多執行緒優化之執行緒池 ThreadPoolExecutor的核心容器阻塞佇列

#引言 多執行緒我想無論是後端開發,還是對於App開發者來說都不會陌生,何況Android強制要求不能在主執行緒中做網路請求,於是乎,在很多初學者或者App的原始碼中會出現會多的new Thread…的方式,這樣的程式碼是不優雅而且存在很多的隱患,假如說在使用者

Java多執行緒---阻塞佇列舉例說明

一. 前言   在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員

佇列實現棧,兩個佇列實現一個棧方法含實現程式碼

本節介紹一下如何用兩個佇列實現棧。 棧的主要操作就是入棧和出棧,其特點就是後進先出。我們先將兩個佇列分別定義為 queue1 與 queue2。 方案 1 入棧和出棧,都在 queue1 中完成,而 queue2 作為中轉空間。 入棧:直接入 queue1 即可。 出棧:把 queue1 中除最後一