1. 程式人生 > >JAVA中常見的阻塞佇列詳解

JAVA中常見的阻塞佇列詳解

![](https://img2020.cnblogs.com/other/2024393/202011/2024393-20201116085210744-2073386236.jpg) * 在之前的執行緒池的介紹中我們看到了很多阻塞佇列,這篇文章我們主要來說說阻塞佇列的事。 * 阻塞佇列也就是 `BlockingQueue` ,這個類是一個接 * 口,同時繼承了 `Queue` 介面,這兩個介面都是在`JDK5` 中加入的 。 * `BlockingQueue` 阻塞佇列是執行緒安全的,在我們業務中是會經常頻繁使用到的,如典型的生產者消費的場景,生產者只需要向佇列中新增,而消費者負責從佇列中獲取。 ![](https://img2020.cnblogs.com/other/2024393/202011/2024393-20201116085212100-48677601.png) * 如上圖展示,我們生產者執行緒不斷的`put` 元素到佇列,而消費者從中`take` 出元素處理,這樣實現了任務與執行任務類之間的解耦,任務都被放入到了阻塞佇列中,這樣生產者和消費者之間就不會直接相互訪問實現了隔離提高了安全性。 ## 併發佇列 ![](https://img2020.cnblogs.com/other/2024393/202011/2024393-20201116085212495-587651545.png) * 上面是 `Java` 中佇列`Queue` 類的類圖,我們可以看到它分為兩大類,**阻塞佇列與非阻塞佇列** * 阻塞佇列的實現介面是 `BlockingQueue` 而非阻塞佇列的介面是 `ConcurrentLinkedQueue` , 本文主要介紹阻塞佇列,非阻塞佇列不再過多闡述 * `BlockingQueue` 主要有下面六個實現類,分別是 `ArrayBlockingQueue`、`LinkedBlockingQueue`、`SynchronousQueue`、`DelayQueue`、`PriorityBlockingQueue`、`LinkedTransferQueue` 。這些阻塞佇列有著各自的特點和適用場景,後面詳細介紹。 * 非阻塞佇列的典型例子如 `ConcurrentLinkedQueue` , 它不會阻塞執行緒,而是利用了 `CAS` 來保證執行緒的安全。 * 其實還有一個佇列和 `Queue` 關係很緊密,那就是`Deque`,這其實是 `double-ended-queue` 的縮寫,意思是雙端佇列。它的特點是從頭部和尾部都能新增和刪除元素,而我們常見的普通佇列`Queue` 則是隻能一端進一端出,即`FIFO` 。 ## 阻塞佇列特點 * 阻塞佇列的特點就在於阻塞,它可以阻塞執行緒,讓生產者消費者得以平衡,阻塞佇列中有兩個關鍵方法 `Put` 和 `Take` 方法 **take方法** * `take `方法的功能是獲取並移除佇列的頭結點,通常在佇列裡有資料的時候是可以正常移除的。可是一旦執行 `take` 方法的時候,佇列裡無資料,則阻塞,直到佇列裡有資料。一旦佇列裡有資料了,就會立刻解除阻塞狀態,並且取到資料。過程如圖所示: ![](https://img2020.cnblogs.com/other/2024393/202011/2024393-20201116085213625-1488568057.png) **put方法** * `put `方法插入元素時,如果佇列沒有滿,那就和普通的插入一樣是正常的插入,但是如果佇列已滿,那麼就無法繼續插入,則阻塞,直到佇列裡有了空閒空間。如果後續佇列有了空閒空間,比如消費者消費了一個元素,那麼此時佇列就會解除阻塞狀態,並把需要新增的資料新增到佇列中。過程如圖所示: ![](https://img2020.cnblogs.com/other/2024393/202011/2024393-20201116085214330-1428051951.png) **是否有界(容量有多大)** * 此外,阻塞佇列還有一個非常重要的屬性,那就是容量的大小,分為有界和無界兩種。 * 無界佇列意味著裡面可以容納非常多的元素,例如 `LinkedBlockingQueue `的上限是 `Integer.MAX_VALUE`,約為 2 的 31 次方,是非常大的一個數,可以近似認為是無限容量,因為我們幾乎無法把這個容量裝滿。 * 但是有的阻塞佇列是有界的,例如 `ArrayBlockingQueue `如果容量滿了,也不會擴容,所以一旦滿了就無法再往裡放資料了。 ## 阻塞佇列常見方法 * 首先我們從常用的方法出發,根據各自的特點我們可以大致分為三個大類,如下表所示: | **分類** | **方法** | **含義** | **特點** | | :---: | :---: | :---: | :---: | | 丟擲異常 | add | 新增一個元素 | 如果佇列已滿,新增則丟擲  `IllegalStateException` 異常 | | | remove | 刪除佇列頭節點 | 當佇列為空後,刪除則丟擲  `NoSuchElementException` 異常 | | | element | 獲取佇列頭元素 | 當佇列為空時,則丟擲 `NoSuchElementException` 異常 | | 返回無異常 | offer | 新增一個元素 | 當佇列已滿,不會報異常,返回  `false` ,如果成功返回 `true` | | | poll | 獲取佇列頭節點,並且刪除它 | 當佇列空時,返回  `Null`   | | | peek | 單純獲取頭節點 | 當佇列為空時反饋 `NULL` | | 阻塞 | put | 新增一個元素 | 如果佇列已滿則阻塞 | | | take | 返回並刪除頭元素 | 如果佇列為空則阻塞 | * 如上面所示主要的八個方法,相對都比較簡單,下面我們通過實際程式碼演示的方式來認識 ### 拋異常型別[add、remove、element] #### add * 向佇列中新增一個元素。如果佇列是有界佇列,當佇列已滿時再新增則丟擲異常提示,如下: ``` java BlockingQueue queue = new ArrayBlockingQueue(2); queue.add(1); queue.add(2); queue.add(3); ``` * 上述程式碼中我們建立了一個阻塞佇列容量為2,當我們使用 `add` 向其中新增元素,當新增到第三個時則會丟擲異常如下: ![](https://img2020.cnblogs.com/other/2024393/202011/2024393-20201116085214949-814882568.png) #### remove * `remove` 方法是從佇列中刪除佇列的頭節點,同時會返回該元素。當佇列中為空時執行 `remove` 方法時則會丟擲異常,程式碼如下: ``` java private static void groupRemove() { BlockingQueue queue = new ArrayBlockingQueue(2); queue.add("i-code.online"); System.out.println(queue.remove()); System.out.println(queue.remove()); } ``` * 上述程式碼中,我們可以看到,我們想佇列中添加了一個元素 `i-code.online` , 之後通過 `remove` 方法進行刪除,當執行第二次`remove` 時佇列內已無元素,則丟擲異常。如下: ![](https://img2020.cnblogs.com/other/2024393/202011/2024393-20201116085215221-175516982.png) #### element * `element` 方法是獲取佇列的頭元素,但是並不是刪除該元素,這也是與 `remove` 的區別,當佇列中沒有元素後我們再執行 `element` 方法時則會丟擲異常,程式碼如下: ``` java private static void groupElement() { BlockingQueue queue = new ArrayBlockingQueue(2); queue.add("i-code.online"); System.out.println(queue.element()); System.out.println(queue.element()); } private static void groupElement2() { BlockingQueue queue = new ArrayBlockingQueue(2); System.out.println(queue.element()); } ``` * 上面兩個方法分別演示了在有元素和無元素的情況`element` 的使用。在第一個方法中並不會報錯,因為首元素一直存在的,第二個方法中因為空的,所以丟擲異常,如下結果: ![](https://img2020.cnblogs.com/other/2024393/202011/2024393-20201116085215795-1657815880.png) ### 無異常型別[offer、poll、peek] #### offer * `offer` 方法是向佇列中新增元素, 同時反饋成功與失敗,如果失敗則返回 `false` ,當佇列已滿時繼續新增則會失敗,程式碼如下: ``` java private static void groupOffer() { BlockingQueue queue = new ArrayBlockingQueue(2); System.out.println(queue.offer("i-code.online")); System.out.println(queue.offer("雲棲簡碼")); System.out.println(queue.offer("AnonyStar")); } ``` * 如上述程式碼所示,我們向一個容量為2的佇列中通過`offer` 新增元素,當新增第三個時,則會反饋 `false` ,如下結果: ``` true true false ``` #### poll * `poll` 方法對應上面 `remove` 方法,兩者的區別就在於是否會在無元素情況下丟擲異常,`poll` 方法在無元素時不會丟擲異常而是返回`null` ,如下程式碼: ``` java private static void groupPoll() { BlockingQueue queue = new ArrayBlockingQueue(2); System.out.println(queue.offer("雲棲簡碼")); //新增元素 System.out.println(queue.poll()); //取出頭元素並且刪除 System.out.println(queue.poll()); } ``` * 上面程式碼中我們建立一個容量為2的佇列,並新增一個元素,之後呼叫兩次`poll`方法來獲取並刪除頭節點,發現第二次呼叫時為`null` ,因為佇列中已經為空了,如下: ``` true 雲棲簡碼 null ``` #### peek * `peek` 方法與前面的 `element` 方法是對應的 ,獲取元素頭節點但不刪除,與其不同的在於`peek` 方法在空佇列下並不會丟擲異常,而是返回 `null`,如下: ``` java private static void groupPeek() { BlockingQueue queue = new ArrayBlockingQueue(2); System.out.println(queue.offer(1)); System.out.println(queue.peek()); System.out.println(queue.peek()); } private static void groupPeek2() { BlockingQueue queue = new ArrayBlockingQueue(2); System.out.println(queue.peek()); } ``` * 如上述程式碼所示,我麼們分別展示了非空佇列與空佇列下`peek` 的使用,結果如下: ![](https://img2020.cnblogs.com/other/2024393/202011/2024393-20201116085216114-1907720397.png) ### 阻塞型別[put、take] #### put * `put` 方法是向佇列中新增一個元素,這個方法是阻塞的,也就是說當佇列已經滿的情況下,再`put`元素時則會阻塞,直到佇列中有空位. #### take * `take` 方法是從佇列中獲取頭節點並且將其移除,這也是一個阻塞方法,當佇列中已經沒有元素時,`take` 方法則會進入阻塞狀態,直到佇列中有新的元素進入。 ## 常見的阻塞佇列 ### ArrayBlockingQueue * `ArrayBlockingQueue` 是一個我們常用的典型的**有界佇列**,其內部的實現是基於陣列來實現的,我們在建立時需要指定其長度,它的執行緒安全性由 `ReentrantLock` 來實現的。 ``` java public ArrayBlockingQueue(int capacity) {...} public ArrayBlockingQueue(int capacity, boolean fair) {...} ``` * 如上所示,`ArrayBlockingQueue` 提供的建構函式中,我們需要指定佇列的長度,同時我們也可以設定佇列是都是公平的,當我們設定了容量後就不能再修改了,符合陣列的特性,此佇列按照先進先出(`FIFO`)的原則對元素進行排序。 * 和 `ReentrantLock `一樣,如果 `ArrayBlockingQueue `被設定為非公平的,那麼就存在插隊的可能;如果設定為公平的,那麼等待了最長時間的執行緒會被優先處理,其他執行緒不允許插隊,不過這樣的公平策略同時會帶來一定的效能損耗,因為非公平的吞吐量通常會高於公平的情況。 ### LinkedBlockingQueue * 從它的名字我們可以知道,它是一個由連結串列實現的佇列,這個佇列的長度是 `Integer.MAX_VALUE` ,這個值是非常大的,幾乎無法達到,對此我們可以認為這個佇列基本屬於一個無界佇列(也又認為是有界佇列)。此佇列按照先進先出的順序進行排序。 ### SynchronousQueue * `synchronousQueue` 是一個不儲存任何元素的阻塞佇列,每一個`put`操作必須等待`take`操作,否則不能新增元素。同時它也支援公平鎖和非公平鎖。 * `synchronousQueue` 的容量並不是1,而是0。因為它本身不會持有任何元素,它是直接傳遞的,`synchronousQueue` 會把元素從生產者直接傳遞給消費者,在這個過程中能夠是不需要儲存的 * 在我們之前介紹過的執行緒池 `CachedThreadPool` 就是利用了該佇列。`Executors.newCachedThreadPool()`,因為這個執行緒池它的最大執行緒數是`Integer.MAX_VALUE`,它是更具需求來建立執行緒,所有的執行緒都是臨時執行緒,使用完後空閒60秒則被回收, ### PriorityBlockingQueue * `PriorityBlockingQueue `是一個支援優先順序排序的無界阻塞佇列,可以通過自定義實現 `compareTo() `方法來指定元素的排序規則,或者通過構造器引數 `Comparator `來指定排序規則。**但是需要注意插入佇列的物件必須是可比較大小的,也就是 `Comparable `的,否則會丟擲 `ClassCastException `異常。** * 它的 `take `方法在佇列為空的時候會阻塞,但是正因為它是無界佇列,而且會自動擴容,所以它的佇列永遠不會滿,所以它的 `put `方法永遠不會阻塞,新增操作始終都會成功 ### DelayQueue * `DelayQueue` 是一個實現`PriorityBlockingQueue`的延遲獲取的無界佇列。具有“延遲”的功能。 * `DelayQueue` 應用場景:1. 快取系統的設計:可以用`DelayQueue`儲存快取元素的有效期,使用一個執行緒迴圈查詢`DelayQueue`,一旦能從`DelayQueue`中獲取元素時,表示快取有效期到了。2. 定時任務排程。使用`DelayQueue`儲存當天將會執行的任務和執行時間,一旦從`DelayQueue`中獲取到任務就開始執行,從比如`TimerQueue`就是使用`DelayQueue`實現的。 * 它是無界佇列,放入的元素必須實現 `Delayed `介面,而 `Delayed `介面又繼承了 `Comparable `介面,所以自然就擁有了比較和排序的能力,程式碼如下: ``` java public interface Delayed extends Co