Java併發49:併發集合系列-基於獨佔鎖+連結串列實現的單向阻塞無界佇列LinkedBlockingQueue
原文地址:http://www.importnew.com/25583.html
一、前言
前面介紹了使用CAS實現的非阻塞佇列ConcurrentLinkedQueue,下面就來介紹下使用獨佔鎖實現的阻塞佇列LinkedBlockingQueue的實現。
二、 LinkedBlockingQueue類圖結構
如圖LinkedBlockingQueue中:
- 也有兩個Node分別用來存放首尾節點,
- 並且裡面有個初始值為0的原子變數count用來記錄佇列元素個數,
- 另外裡面有兩個ReentrantLock的獨佔鎖,分別用來控制元素入隊和出隊加鎖,
- 其中takeLock用來控制同時只有一個執行緒可以從佇列獲取元素,其他執行緒必須等待,
- putLock控制同時只能有一個執行緒可以獲取鎖去新增元素,其他執行緒必須等待。
- 另外notEmpty和notFull用來實現入隊和出隊的同步。
另外由於出入隊是兩個非公平獨佔鎖,所以可以同時有一個執行緒入隊和一個執行緒出隊,其實這個是個生產者-消費者模型。
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
* Current number of elements / private final AtomicInteger count = new AtomicInteger(0);
public static final int MAX_VALUE = 0x7fffffff;public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //初始化首尾節點 last = head = new Node<E>(null); }
如圖預設佇列容量為0x7fffffff
,使用者也可以自己指定容量。
三、必備基礎
3.1 ReentrantLock
可以參考:Java併發19:Lock系列-Lock介面基本方法學習例項
3.2 條件變數(Condition)
可以參考:Java併發20:Lock系列-Condition介面基本方法學習例項
四 、帶超時時間的offer操作-生產者
在隊尾新增元素,
- 如果佇列滿了,那麼等待timeout時候,如果時間超時則返回false,
- 如果在超時前佇列有空餘空間,則插入後返回true。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { //空元素拋空指標異常 if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //獲取可被中斷鎖,只有一個執行緒克獲取 putLock.lockInterruptibly(); try { //如果佇列滿則進入迴圈 while (count.get() == capacity) { //nanos<=0直接返回 if (nanos <= 0) return false; //否者呼叫await進行等待,超時則返回<=0(1) nanos = notFull.awaitNanos(nanos); } //await在超時時間內返回則新增元素(2) enqueue(new Node<E>(e)); c = count.getAndIncrement(); //佇列不滿則啟用其他等待入隊執行緒(3) if (c + 1 < capacity) notFull.signal(); } finally { //釋放鎖 putLock.unlock(); } //c==0說明佇列裡面有一個元素,這時候喚醒出隊執行緒(4) if (c == 0) signalNotEmpty(); return true; } private void enqueue(Node<E> node) { last = last.next = node; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
五、 帶超時時間的poll操作-消費者
獲取並移除隊首元素,在指定的時間內去輪詢佇列看有沒有首元素有則返回,否者超時後返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //出隊執行緒獲取獨佔鎖 takeLock.lockInterruptibly(); try { //迴圈直到佇列不為空 while (count.get() == 0) { //超時直接返回null if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } //出隊,計數器減一 x = dequeue(); c = count.getAndDecrement(); //如果出隊前佇列不為空則傳送訊號,啟用其他阻塞的出隊執行緒 if (c > 1) notEmpty.signal(); } finally { //釋放鎖 takeLock.unlock(); } //當前佇列容量為最大值-1則啟用入隊執行緒。 if (c == capacity) signalNotFull(); return x; }
六、put操作-生產者
與帶超時時間的poll類似不同在於put時候如果當前佇列滿了它會一直等待其他執行緒呼叫notFull.signal才會被喚醒。
七、 take操作-消費者
與帶超時時間的poll類似不同在於take時候如果當前佇列空了它會一直等待其他執行緒呼叫notEmpty.signal()才會被喚醒。
八、 size操作
當前佇列元素個數,如程式碼直接使用原子變數count獲取。
public int size() { return count.get(); }
九、peek操作
獲取但是不移除當前佇列的頭元素,沒有則返回null:
public E peek() { //佇列空,則返回null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
十、 remove操作
刪除佇列裡面的一個元素,有則刪除返回true,沒有則返回false。
在刪除操作時候由於要遍歷佇列所以加了雙重鎖,也就是在刪除過程中不允許入隊也不允許出隊操作:
public boolean remove(Object o) { if (o == null) return false; //雙重加鎖 fullyLock(); try { //遍歷佇列找則刪除返回true for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } //找不到返回false return false; } finally { //解鎖 fullyUnlock(); } } void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } void unlink(Node<E> p, Node<E> trail) { p.item = null; trail.next = p.next; if (last == p) last = trail; //如果當前佇列滿,刪除後,也不忘記最快的喚醒等待的執行緒 if (count.getAndDecrement() == capacity) notFull.signal(); }
十一、開源框架中使用
tomcat中任務佇列TaskQueue
11.1 類圖結構
可知TaskQueue繼承了LinkedBlockingQueue並且泛化型別固定了為Runnalbe.重寫了offer、poll、take
方法。
十二、總結
LinkedBlockingQueue與ConcurrentLinkedQueue相比前者是阻塞佇列,使用可重入獨佔的非公平鎖來實現。
通過使用put鎖和take鎖使得入隊和出隊解耦可以同時進行處理,但是同時只有一個執行緒可以入隊或者出隊,其他執行緒必須等待。
另外引入了條件變數來進行入隊和出隊的同步,每個條件變數維護一個條件佇列用來存放阻塞的執行緒。
LinkedBlockingQueue的size操作通過使用原子變數count獲取能夠比較精確的獲取當前佇列的元素個數。
另外remove方法使用雙鎖保證刪除時候佇列元素保持不變,另外其實這個是個生產者-消費者模型。
而ConcurrentLinkedQueue則使用CAS非阻塞演算法來實現,使用CAS原子操作保證連結串列構建的安全性。
當多個執行緒併發時候CAS失敗的執行緒不會被阻塞,而是使用cpu資源去輪詢CAS直到成功。
size方法先比LinkedBlockingQueue的獲取的個數是不精確的,因為獲取size的時候是通過遍歷佇列進行的,而遍歷過程中可能進行增加刪除操作,remove方法操作時候也沒有對整個佇列加鎖。
remove時候可能進行增加刪除操作,這就可能刪除了一個剛剛新增的元素,而不是刪除的想要位置的。