阻塞佇列與非阻塞佇列區別
最後有佇列的簡單使用例子
-----------------------------------------------------------------
在併發程式設計中,有時候需要使用執行緒安全的佇列。如果要實現一個執行緒安全的佇列有兩種方式:一種是使用阻塞演算法,另一種是使用非阻塞演算法。
//使用阻塞演算法的佇列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現。非阻塞的實現方式則可以使用迴圈CAS的方式來實現。
阻塞佇列:
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞的插入和移除方法。
1)支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿。
2)支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。
阻塞佇列常用於生產者和消費者的場景,生產者是向佇列裡新增元素的執行緒,消費者是從佇列裡取元素的執行緒。阻塞佇列就是生產者用來存放元素、消費者用來獲取元素的容器。
下表是阻塞佇列的部分方法:
方法\處理方式 | 丟擲異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
- 丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。
- 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null
- 一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。
- 超時退出:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。
JDK7提供了7個阻塞佇列。分別是:
- ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。
- LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。
- PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。
- DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
- SynchronousQueue:一個不儲存元素的阻塞佇列。
- LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
- LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。
簡單介紹下其中三個佇列:
SynchronousQueue
SynchronousQueue是無界的,是一種無緩衝的等待佇列,但是由於該Queue本身的特性,在某次新增元素後必須等待其他執行緒取走後才能繼續新增;可以認為SynchronousQueue是一個快取值為1的阻塞佇列,但是 isEmpty()方法永遠返回是true,remainingCapacity() 方法永遠返回是0,remove()和removeAll() 方法永遠返回是false,iterator()方法永遠返回空,peek()方法永遠返回null。
宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:如果採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO佇列來阻塞多餘的生產者和消費者,從而體系整體的公平策略;但如果是非公平模式(SynchronousQueue預設):SynchronousQueue採用非公平鎖,同時配合一個LIFO佇列來管理多餘的生產者和消費者,而後一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的資料永遠都得不到處理。
LinkedBlockingQueue
LinkedBlockingQueue是無界的,是一個無界快取的等待佇列。
基於連結串列的阻塞佇列,內部維持著一個數據緩衝佇列(該佇列由連結串列構成)。當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。
LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
ArrayListBlockingQueue
ArrayListBlockingQueue是有界的,是一個有界快取的等待佇列。
基於陣列的阻塞佇列,同LinkedBlockingQueue類似,內部維持著一個定長資料緩衝佇列(該佇列由陣列構成)。ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。
ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。
非阻塞佇列:
ConcurrentLinkedQueue
ConcurrentLinkedQueue是一個基於連結節點的無界執行緒安全佇列,它採用先進先出的規則對節點進行排序,當我們新增一個元素的時候,它會新增到佇列的尾部;當我們獲取一個元素時,它會返回佇列頭部的元素。
結構如下:
-
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
-
implements Queue<E>, java.io.Serializable {
-
private transient volatile Node<E> head;//頭指標
-
private transient volatile Node<E> tail;//尾指標
-
public ConcurrentLinkedQueue() {//初始化,head=tail=(一個空的頭結點)
-
head = tail = new Node<E>(null);
-
}
-
private static class Node<E> {
-
volatile E item;
-
volatile Node<E> next;//內部是使用單向連結串列實現
-
......
-
}
-
......
-
}
入隊和出隊操作均利用CAS(compare and set)更新,這樣允許多個執行緒併發執行,並且不會因為加鎖而阻塞執行緒,使得併發效能更好。
------------------------------------------------------------------------------------------------------------------
Java 例項 - 佇列(Queue)用法
佇列是一種特殊的線性表,它只允許在表的前端進行刪除操作,而在表的後端進行插入操作。
LinkedList類實現了Queue介面,因此我們可以把LinkedList當成Queue來用。
import java.util.LinkedList;
import java.util.Queue;
public class Main {
public static void main(String[] args) {
//add()和remove()方法在失敗的時候會丟擲異常(不推薦)
Queue<String> queue = new LinkedList<String>();
//新增元素
queue.offer("a");
queue.offer("b");
queue.offer("c");
queue.offer("d");
queue.offer("e");
for(String q : queue){
System.out.println(q);
}
System.out.println("===");
System.out.println("poll="+queue.poll()); //返回第一個元素,並在佇列中刪除
for(String q : queue){
System.out.println(q);
}
System.out.println("===");
System.out.println("element="+queue.element()); //返回第一個元素
for(String q : queue){
System.out.println(q);
}
System.out.println("===");
System.out.println("peek="+queue.peek()); //返回第一個元素
for(String q : queue){
System.out.println(q);
}
}
}
以上程式碼執行輸出結果為:
a b c d e === poll=a b c d e === element=b b c d e === peek=b b c d e