1. 程式人生 > >Java併發佇列與容器

Java併發佇列與容器

【前言:無論是大資料從業人員還是Java從業人員,掌握Java高併發和多執行緒是必備技能之一。本文主要闡述Java併發包下的阻塞佇列和併發容器,其實研讀過大資料相關技術如Spark、Storm等原始碼的,會發現它們底層大多用到了Java併發佇列、同步類容器、ReentrantLock等。建議大家結合本篇文章,仔細分析一下相關原始碼】

BlockingQueue

阻塞佇列,位於java.util.concurrent併發包下,它很好的解決了多執行緒中如何安全、高效的資料傳輸問題。所謂“阻塞”是指在某些情況下執行緒被掛起,當滿足一定條件時會被自動喚醒,可以通過API進行控制。

常見的阻塞佇列主要分為兩種FIFO(先進先出)和LIFO(後進先出),當然通過不同的實現方式,還可以引申出多種不同型別的佇列。首先了解一下BlockingQueue的幾個核心API:put、take一對阻塞存取;add、poll一對非阻塞存取。

插入資料

put(anObj):把anObj加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻塞,直到BlockingQueue裡面有空間再繼續插入

add(anObj):把anObj加到BlockingQueue裡,如果BlockingQueue可以容納,則返回true,否則丟擲異常

offer(anObj):表示如果可能的話,將anObj加到BlockingQueue裡,如果BlockingQueue可以容納,則返回true,否則返回false。

讀取資料

take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態,直到Blocking有新的物件被加入為止

poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null

BlockingQueue核心成員介紹

ArrayBlockingQueue

基於陣列實現的有界阻塞佇列。因為基於陣列實現,所以具有查詢快,增刪慢的特點。

生產者和消費者用的是同一把鎖,不能並行執行效率低。它底層使用了一種標準互斥鎖ReentrantLock,即讀讀、讀寫,寫寫都互斥,當然可以控制物件內部是否採用公平鎖,預設是非公平鎖。消費方式是FIFO。

生產和消費資料時,直接將列舉物件插入或刪除,不會產生或銷燬額外的物件例項。

應用:因為底層生產和消費用了同一把鎖,定長陣列不用頻繁建立和銷燬物件,適合於想按照佇列順序去執行任務,還不想出現頻繁的GC的場景。

 

LinkedBlockingQueue

基於連結串列實現的阻塞佇列,同樣具有增刪快,定位慢的特點。

需要注意一點:預設情況下建立的LinkedBlockingQueue容量是Integer.MAX_VALUE, 在這種情況下,如果生產者的速度一旦大於消費者的速度,可能還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗盡。可以通過指定容量建立LinkedBlockingQueue避免這種極端情況的發生。

雖然底層使用的也是ReentrantLock但take和put是分離的(生產和消費的鎖不是同一把鎖),高併發場景下效率仍然高於ArrayBlockingQueue。put方法在佇列滿的時候會阻塞直到有佇列成員被消費,take方法在佇列空的時候會阻塞,直到有佇列成員被放進來。

 

DelayQueue

DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。DelayQueue中的元素,只有指定的延遲時間到了,才能夠從佇列中獲取到該元素。

應用場景:

1.客戶端長時間佔用連線的問題,超過這個空閒時間了,可以移除的

2.處理長時間不用的快取:如果佇列裡面的物件長時間不用,超過空閒時間,就移除

3.任務超時處理

 

PriorityBlockingQueue

PriorityBlockingQueue不會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者。因此必須控制生產者生產資料的速度,避免消費者消費資料速度跟不上,否則時間一長,會最終耗盡所有的可用堆記憶體空間。

在向PriorityBlockingQueue中新增元素時,元素通過在實現實現Comparable介面,重寫compareTo()來定義優先順序的邏輯。它內部控制執行緒同步的鎖採用的是公平鎖。

 

SynchronousQueue

一種無緩衝的等待佇列,來一個任務就執行這個任務,這期間不能新增任何的任務。也就是不用阻塞了,其實對於少量任務而言,這種做法更高效。

宣告一個SynchronousQueue有兩種不同的方式,公平模式和非公平模式:

公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO佇列來阻塞多餘的生產者和消費者,從而體現整體的公平策略;

非公平模式(SynchronousQueue預設):SynchronousQueue採用非公平鎖,同時配合一個LIFO佇列來管理多餘的生產者和消費者,而後一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的資料永遠都得不到處理。

 

ConcurrentLinkedQueue

不上鎖,高併發場景效率遠高於ArrayBlockingQueue和LinkedBlockingQueue等

 

容器

同步類容器

第一類:Vector、Stack、HashTable都是同步類,執行緒安全的,但高併發場景下仍然可能出現問題如ConcurrentModificationException。

第二類:Collections提供的一些工廠類(靜態),效率低

 

併發類容器

CopyOnWrite容器

寫時複製的容器:當我們往一個容器新增元素的時候,不直接往當前容器新增,而是先將當前容器進行copy,複製出一個新的容器,然後往新的容器裡新增元素,新增完元素之後,再將原容器的引用指向新的容器,非常適合讀多寫少的場景。

但同時存在如下問題:

資料一致性問題:CopyOnWrite容器是弱一致性的,即只能保證資料的最終一致性,不能保證資料的實時一致性。所以如果你希望寫入的的資料能夠即時讀到,不要使用CopyOnWrite容器。

記憶體佔用問題:因為CopyOnWrite 的寫時複製機制,所以在進行寫操作的時候,記憶體裡會同時駐紮兩個物件的記憶體,舊的物件和新寫入的物件。如果這些物件佔用的記憶體比較大,如果控制不好,比如寫特別多的情景,很有可能造成頻繁的Yong GC 和Full GC。針對記憶體佔用問題,可以通過壓縮容器中的元素的方法來減少大物件的記憶體消耗,或者不使用CopyOnWrite容器,而使用其他的併發容器,如ConcurrentHashMap。

有兩種常見的CopyOnWrite容器:CopyOnWriteArrayList和CopyOnWriteArraySet,其中CopyOnWriteArrayList是ArrayList 的一個執行緒安全的變體。

 

 

ConcurrentHashMap

筆者分JDK1.7和JDK1.8兩部分說明ConcurrentHashMap。

JDK1.7 ConcurrentHashMap

JDK1.7採用"鎖分段"技術來降低鎖的粒度,它把整個map劃分為一系列由segment組成的單元,一個segment相當於一個hashtable。通過這種方式,加鎖的物件就從整個map變成了一個segment。ConcurrentHashMap執行緒安全並且提高效能原因就在於:對map中的讀是併發的,無需加鎖;只有在put、remove操作時才加鎖,而加鎖僅是對需要操作的segment加鎖,不會影響其他segment的讀寫。因此不同的segment之間可以併發使用,極大地提高了效能。

根據原始碼又可得出查詢、插入、刪除的過程:通過key的hash確定segement(插入時如果segment大小達到擴容閾值則進行擴容) --> 確定連結串列陣列HashEntry下標(插入/刪除時,獲取連結串列頭) --> 遍歷連結串列【查詢:呼叫equals()進行比對,找到與所查詢key相等的結點並讀取;插入:如果找到相同的key的結點則更新value值,如果沒有則插入新結點;刪除:找到被刪除結點後,以被刪除結點的next結點開始建立新的連結串列,然後再把原連結串列頭直到被刪結點的前繼結點依次複製、插入新連結串列,最後把新連結串列頭設定為當前陣列下標元素取代舊連結串列。


JDK1.8ConcurrentHashMap

JDK1.8中的ConcurrentHashMap在JDK1.7上做了很多優化:

1. 取消segments欄位,直接採用transient volatile HashEntry<K,V>[] table儲存資料,採用table陣列元素作為鎖,從而實現了對每一行資料進行加鎖,通過進一步降低鎖粒度來減少併發衝突的概率

2. 將原先table陣列+連結串列的資料結構,變更為table陣列+連結串列+紅黑樹的結構。對於hash表來說,最核心的能力在於將key hash之後能均勻的分佈在陣列中。如果hash之後雜湊的很均勻,那麼table陣列中的每個佇列長度主要為0或者1。但實際情況並非總是如此理想,雖然ConcurrentHashMap類預設的載入因子為0.75,但是在資料量過大或者運氣不佳的情況下,還是會存在一些佇列長度過長的情況,如果還是採用單向列表方式,那麼查詢某個節點的時間複雜度為O(n);因此,對於個數超過8(預設值)的列表,jdk1.8中採用了紅黑樹的結構,那麼查詢的時間複雜度可以降低到O(logN),可以改進效能

3. 新增欄位transient volatile CounterCell[] counterCells,可方便的計算集合中所有元素的個數,效能大大優於jdk1.7中的size()方法

相信通過這些介紹,大家對於諸如"為什麼選擇ConcurrentHashMap?"會有很好的思路了。


關注微信公眾號:大資料學習與分享,獲取更對技術乾貨