1. 程式人生 > >Java併發包原始碼學習系列:阻塞佇列實現之SynchronousQueue原始碼解析

Java併發包原始碼學習系列:阻塞佇列實現之SynchronousQueue原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112301359) - [Java併發包原始碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838) - [Java併發包原始碼學習系列:ReentrantLock可重入獨佔鎖詳解](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112454874) - [Java併發包原始碼學習系列:ReentrantReadWriteLock讀寫鎖解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112689635) - [Java併發包原始碼學習系列:詳解Condition條件佇列、signal和await](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112727669) - [Java併發包原始碼學習系列:掛起與喚醒執行緒LockSupport工具類](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112757098) - [Java併發包原始碼學習系列:JDK1.8的ConcurrentHashMap原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113059783) - [Java併發包原始碼學習系列:阻塞佇列BlockingQueue及實現原理分析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113186979) - [Java併發包原始碼學習系列:阻塞佇列實現之ArrayBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113252384) - [Java併發包原始碼學習系列:阻塞佇列實現之LinkedBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113329416) - [Java併發包原始碼學習系列:阻塞佇列實現之PriorityBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113358710) - [Java併發包原始碼學習系列:阻塞佇列實現之DelayQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113440013) ## SynchronousQueue概述 SynchronousQueue是一個**不儲存元素**的阻塞佇列,**每個插入的操作必須等待另一個執行緒進行相應的刪除操作**,反之亦然,因此這裡的Synchronous指的是讀執行緒和寫執行緒需要同步,一個讀執行緒匹配一個寫執行緒。 你不能在該佇列中使用peek方法,因為peek是隻讀取不移除,不符合該佇列特性,該佇列不儲存任何元素,資料必須從某個寫執行緒交給某個讀執行緒,而不是在佇列中等待倍消費,非常適合傳遞性場景。 SynchronousQueue的吞吐量高於LinkedBlockingQueue和ArrayBlockingQueue。 該類還支援可供選擇的**公平性策略**,預設採用非公平策略,當佇列可用時,阻塞的執行緒都可以爭奪訪問佇列的資格。 ## 使用案例 ```java public class TestSync { public static void main (String[] args) { SynchronousQueue queue = new SynchronousQueue<>(true); Producer producer = new Producer(queue); Customer customer = new Customer(queue); producer.start(); customer.start(); } } class Producer extends Thread{ SynchronousQueue queue; Producer(SynchronousQueue queue){ this.queue = queue; } @SneakyThrows @Override public void run () { while(true){ int product = new Random().nextInt(500); System.out.println("生產產品, id : " + product); System.out.println("等待3s後給消費者消費..."); TimeUnit.SECONDS.sleep(3); queue.put(product); TimeUnit.MILLISECONDS.sleep(100); } } } class Customer extends Thread{ SynchronousQueue queue; Customer(SynchronousQueue queue){ this.queue = queue; } @SneakyThrows @Override public void run () { while(true){ Integer product = queue.take(); System.out.println("消費產品, id : " + product); System.out.println(); } } } // 列印結果 生產產品, id : 194 等待3s後給消費者消費... 消費產品, id : 194 生產產品, id : 140 等待3s後給消費者消費... 消費產品, id : 140 生產產品, id : 40 等待3s後給消費者消費... 消費產品, id : 40 ``` ## 類圖結構 ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210201194918815-287852260.png) ## put與take方法 ### void put(E e) ```java public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // put方法 : e是生產者傳遞給消費者的元素 if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } ``` ### E take() ```java public E take() throws InterruptedException { // take方法: 表示消費者等待生產者提供元素 E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } ``` put方法和take方法都呼叫了transferer的transfer方法,他們的區別在哪呢?我們可以發現: - 當呼叫put方法,也就是生產者將資料傳遞給消費者時,傳遞的引數為e,是一個非null的元素。 - 而呼叫take方法,也就是消費者希望生產者提供元素時,傳遞的引數為null。 這一點必須明確,transfer是根據這一點來判斷讀or寫執行緒,接著決定是否匹配等,直接來看下Transfer類吧。 ## Transfer ```java public class SynchronousQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { private transient volatile Transferer transferer; } ``` SynchronousQueue內部維護了volatile修飾的Transferer變數,它的核心操作都將委託給transferer。 ```java abstract static class Transferer { /** * Performs a put or take. */ abstract E transfer(E e, boolean timed, long nanos); } ``` Transferer類中定義了抽象方法transfer,該方法用於轉移元素,是最最核心的方法,我們先大概瞭解一下定義: - 引數e如果不為null,表示將該元素從生產者轉移給消費者。如果為null,則表示消費者等待生產者提供元素,返回值E就是得到的元素。 - 引數timed表示是否設定超時,如果設定超時,nanos就是需要設定的超時時間。 - 該方法的返回值可以非null,就是消費者從生產者那得到的值,可以為null,代表超時或者中斷,具體需要通過檢測中斷狀態得到。 ```java // 預設使用非公平策略 public SynchronousQueue() { this(false); } /** * 指定公平策略, */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); } ``` 可以發現,在構造SynchronousQueue的時候,可以傳入fair引數指定公平策略,有下面兩種選擇: 1. 公平策略:例項化TransferQueue。 2. 非公平策略:例項化TransferStack,預設就是非公平模式。 他倆便是Transfer類的實現,SynchronousQueue相關操作也都是基於這倆類的,我們接下來將會重點分析這倆的實現。 ## 公平模式TransferQueue ```java static final class TransferQueue extends Transferer { static final class QNode{...} transient volatile QNode head; transient volatile QNode tail; transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); // 初始化虛擬頭節點 head = h; tail = h; } ``` ### QNode QNode定義了佇列中存放的節點: - next指向下一個節點。 - item用於存放資料,資料修改通過CAS操作完成。 - waiter標記在該節點上等待的執行緒。 - isData用來標識該節點的型別,傳遞引數e不為null,則isData為true。 ```java static final class QNode { volatile QNode next; // next域 volatile Object item; // 存放資料,用CAS設定 volatile Thread waiter; // 標記在該節點上等待的執行緒是哪個 final boolean isData; // isData == true表示寫執行緒節點 QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } // ...省略一系列CAS方法 } ``` ### transfer ```java E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed // 判斷當前節點的模式 boolean isData = (e != null); // 迴圈 for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin // 佇列為空 或 當前節點和佇列尾節點型別相同,則將節點入隊 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; // 說明有其他節點入隊,導致讀到的tail不一致,continue if (t != tail) // inconsistent read continue; // 有其他節點入隊,但是tail是一致的,嘗試將tn設定為尾節點,continue if (tn != null) { // lagging tail advanceTail(t, tn); // 如果tail為t,設定為tn continue; } // timed == true 並且超時了, 直接返回null if (timed && nanos <= 0) // can't wait return null; // 構建一個新節點 if (s == null) s = new QNode(e, isData); // 將當前節點插入到tail之後,如不成功,則continue if (!t.casNext(null, s)) // failed to link in continue; // 將當前節點設定為新的tail advanceTail(t, s); // swing tail and wait // 這個方法下面會分析:自旋或阻塞執行緒,直到滿足s.item != e Object x = awaitFulfill(s, e, timed, nanos); // x == s 表示節點被取消、中斷或超時 if (x == s) { // wait was cancelled clean(t, s); return null; } // isOffList用於判斷節點是否已經出隊 next == this if (!s.isOffList()) { // not already unlinked // 嘗試將s節點設定為head advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; // 佇列不為空 且節點型別不同,一個讀一個寫,就可以匹配了 } else { // complementary-mode // 隊頭節點 QNode m = h.next; // node to fulfill // 這裡如果其他執行緒對佇列進行了操作,就重新再來 if (t != tail || m == null || h != head) continue; // inconsistent read // 下面是出隊的程式碼 Object x = m.item; //isData == (x != null) 判斷isData的型別是否和隊頭節點型別相同 // x == m 表示m被取消了 // !m.casItem(x, e))表示將e設定為m的item失敗 if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS // 上面三種情況,任意一種發生,都進行h的出隊操作,m變成head,然後重試 advanceHead(h, m); // dequeue and retry continue; } // 匹配成功,將m變為head,虛擬節點 advanceHead(h, m); // successfully fulfilled // 喚醒在m上等待的執行緒 LockSupport.unpark(m.waiter); // 得到資料 return (x != null) ? (E)x : e; } } } ``` ### awaitFulfill 這個方法將會進行自旋或者阻塞,直到滿足某些條件。 ```java //Spins/blocks until node s is fulfilled. Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 計算需要自旋的次數 // 如果恰好 s 正好是第一個加入的節點,則會自旋一段時間,避免阻塞,提高效率 // 因為其他情況是會涉及到 park掛起執行緒的 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // w為當前執行緒,如果被中斷了,則取消該節點 if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; // 滿足這個條件,才會退出迴圈,也是唯一的出口 // 如果 執行緒1、被阻塞,接著喚醒或者2、中斷了,x != e 就會成立 if (x != e) return x; // 如果設定了timed,需要判斷一下是否超時 if (timed) { nanos = deadline - System.nanoTime(); // 如果超時,取消該節點,continue,下一次在 x!=e時退出迴圈 if (nanos <= 0L) { s.tryCancel(e); continue; } } // 每次減少自旋次數 if (spins >
0) --spins; // 次數用完了,設定一下s的等待執行緒為當前執行緒 else if (s.waiter == null) s.waiter = w; // 沒有超時設定的阻塞 else if (!timed) LockSupport.park(this); // 剩餘時間小於spinForTimeoutThreshold的時候,自旋效能的效率更高 else if (nanos >
spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } ``` 這邊總結一下一些注意點: 1. 為了優化阻塞,先判斷當前的節點s是不是head.next,如果是的話,會優先選擇自旋而不是阻塞,自旋次數到了才阻塞,主要是考慮到阻塞、喚醒需要消耗更多的資源。 2. 自旋的過程如何退出,也就是何時滿足x!=e的條件呢?其實在tryCancel的時候就會導致x!=e,因為該方法會將s的item設定為this。我們看到,執行緒被中斷,超時的時候都會呼叫這個方法,這些條件下將會退出。 ### tryCancel 取消操作其實就是將節點的item設定為this, ```java void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } boolean isCancelled() { return item == this; } ``` 也就是說,如果一旦執行了tryCancel操作【中斷,取消,超時】,退出awaitFulfill之後,一定滿足: ```java // x == s 表示節點被取消、中斷或超時 if (x == s) { // wait was cancelled clean(t, s); return null; } ``` 會執行clean方法清理s節點: ### clean ```java void clean(QNode pred, QNode s) { s.waiter = null; // 清除thread引用 /* * 無論何時,佇列中的最後一個節點都無法刪除,因此使用cleanMe儲存它的前驅 */ while (pred.next == s) { QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head // 隊頭被取消的情況,出隊 if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail if (t == h) // 佇列此時為空,就退出了 return; QNode tn = t.next; if (t != tail) // 隊尾併發改變了 continue; // tn一直定位到為null if (tn != null) { advanceTail(t, tn); continue; } // 這裡 s!= t 表示沒有到要刪除的元素不是最後一個, // 那麼直接將pred.next = s.next就可以了 if (s != t) { // If not tail, try to unsplice QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) // 刪除完畢,退出 return; } // 走到這裡,說明需要刪除的s節點是隊尾節點,需要使用cleanMe QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node // d這裡指的就是 要刪除的節點 QNode d = dp.next; QNode dn; if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); // 清除cleanMe if (dp == pred) return; // s is already saved node // 該分支將dp定位到 pred的位置【第一次應該都會走到這】 } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } } ``` 注意:無論何時, 最後插入的節點不能被刪除,因為直接刪除會存在併發風險,當節點s是最後一個節點時, 將s.pred儲存為cleamMe節點,下次再進行清除操作。 ### TransferQueue總結 transfer就是在一個迴圈中,不斷地去做下面這些事情: 1. 當呼叫transfer方法時,如果佇列為空或隊尾節點的型別和執行緒型別相同【t.isData== isData】,將當前執行緒加入佇列,自旋的方式等待匹配。直到被匹配或超時,或中斷或取消。 2. 如果佇列不為空且隊中存在可以匹配當前執行緒的節點,將匹配的執行緒出隊,重新設定隊頭,返回資料。 注意:無論是上面哪種情況,都會不斷檢測是否有其他執行緒在進行操作,如果有的話,會幫助其他執行緒執行入隊出隊操作。 ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210201194926230-394402060.png) ## 非公平模式TransferStack TransferStack就大致過一下吧: ```java static final class TransferStack extends Transferer { // 表示一個未匹配的消費者 static final int REQUEST = 0; // 代表一個未匹配的生產者 static final int DATA = 1; // 表示匹配另一個生產者或消費者 static final int FULFILLING = 2; // 頭節點 volatile SNode head; // SNode節點定義 static final class SNode {...} ``` ### SNode ```java static final class SNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. SNode(Object item) { this.item = item; } } ``` ### transfer ```java E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; // e為null表示讀,非null表示寫 for (;;) { SNode h = head; // 如果棧為空,或者節點模式和頭節點模式相同, 將節點壓入棧 if (h == null || h.mode == mode) { // empty or same-mode // 處理超時 if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) // 頭節點彈出 casHead(h, h.next); // pop cancelled node else return null; //未超時情況,生成snode節點,嘗試將s設定為頭節點 } else if (casHead(h, s = snode(s, e, h, mode))) { // 自旋,等待執行緒匹配 SNode m = awaitFulfill(s, timed, nanos); // 表示節點被取消、或中斷、或超時 if (m == s) { // wait was cancelled // 清理節點 clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 如果是請求資料,則返回匹配的item, 否則返回s的item return (E) ((mode == REQUEST) ? m.item : s.item); } // 棧不為空, 且模式不相等,說明是一對匹配的節點 // 嘗試用節點s 去滿足 h, 這裡判斷 (m & FULFILLING) == 0會走這個分支 } else if (!isFulfilling(h.mode)) { // try to fulfill // h已經被取消了 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // 將當前節點 標記為FULFILLING, 並設定為head else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear // 這裡m是頭節點 SNode m = s.next; // m is s's match // 說明被其他執行緒搶走了,重新設定head if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } // 得到與m匹配的節點 SNode mn = m.next; // 嘗試去匹配,匹配成功會喚醒等待的執行緒 if (m.tryMatch(s)) { // 匹配成功,兩個都彈出 casHead(s, mn); // pop both s and m // 返回資料節點的值 m.item return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } // 走到這,表示有其他執行緒在進行配對(m & FULFILLING) != 0 // 幫助進行匹配,接著執行出棧操作 } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } } ``` ### TransferStack總結 transfer方法其實就是在一個迴圈中持續地去做下面三件事情: 1. 當呼叫transfer時,如果棧是空的,或者當前執行緒型別和head節點型別相同,則將當前執行緒加入棧中,通過自旋的方式等待匹配。最後返回匹配的節點,如果被取消,則返回null。 2. 如果棧不為空,且有節點可以和當前執行緒進行匹配【讀與寫表示匹配,mode不相等】,CAS加上`FULFILLING`標記,將當前執行緒壓入棧頂,和棧中的節點進行匹配,匹配成功,出棧這兩個節點。 3. 如果棧頂是正在進行匹配的節點`isFulfilling(h.mode)`,則幫助它進行匹配並出棧,再執行後續操作。 ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210201194934486-783769330.png) ## 總結 SynchronousQueue是一個**不儲存元素**的阻塞佇列,**每個插入的操作必須等待另一個執行緒進行相應的刪除操作**,反之亦然,因此這裡的Synchronous指的是讀執行緒和寫執行緒需要同步,一個讀執行緒匹配一個寫執行緒。 該類還支援可供選擇的**公平性策略**,針對不同的公平性策略有兩種不同的Transfer實現,TransferQueue實現公平模式和TransferStack實現非公平模式。 take和put操作都呼叫了transfer核心方法,根據傳入的引數e是否為null來對應處理。 >
最後:Synchronous好抽象啊,好難懂,有很多地方畫了圖也是很難理解,如有不足,望評論區指教。 ## 參考閱讀 - 《Java併發程式設計的藝術》 - 《Java併發程式設計之美》 - [面試準備--執行緒池佇列 SynchronousQueue 詳解](https://blog.csdn.net/weixin_41622183/article/details/89283085) - [javadoop: 解讀 java 併發佇列 BlockingQueue](https://javadoop.com/post/java-concurrent-queue#toc_3) - [http://cmsblogs.com/?p=2418](http://cmsblogs.com/?p=2