JUC原始碼分析-集合篇(八):SynchronousQueue
SynchronousQueue 是一個同步阻塞佇列,它的每個插入操作都要等待其他執行緒相應的移除操作,反之亦然。SynchronousQueue 像是生產者和消費者的會合通道,它比較適合“切換”或“傳遞”這種場景:一個執行緒必須同步等待另外一個執行緒把相關資訊/時間/任務傳遞給它。在之後的執行緒池原始碼分析中我們也會見到它,所以理解本章對我們之後的執行緒池講解也會有很大幫助。
概述
SynchronousQueue(後面稱SQ)內部沒有容量,所以不能通過
peek
方法獲取頭部元素;也不能單獨插入元素,可以簡單理解為它的插入和移除是“一對”對稱的操作。為了相容 Collection 的某些操作(例如contains
),SQ 扮演了一個空集合的角色。 SQ 的一個典型應用場景是線上程池中,Executors.newCachedThreadPool() 就使用了它,這個構造使執行緒池根據需要(新任務到來時)建立新的執行緒,如果有空閒執行緒則會重複使用,執行緒空閒了60秒後會被回收。
SQ 為等待過程中的生產者或消費者執行緒提供可選的公平策略(預設非公平模式)。非公平模式通過棧(LIFO)實現,公平模式通過佇列(FIFO)實現。使用的資料結構是雙重佇列(Dual queue)和雙重棧(Dual stack)(後面詳細講解)。FIFO通常用於支援更高的吞吐量,LIFO則支援更高的執行緒區域性儲存(TLS)。
SQ 的阻塞演算法可以歸結為以下幾點:
- 使用了雙重佇列(Dual queue)和雙重棧(Dual stack)儲存資料,佇列中的每個節點都可以是一個生產者或是消費者。(有關雙重佇列,請參考筆者另外一篇文章:JUC原始碼分析-集合篇(四):LinkedTransferQueue,雙重棧與它原理一致)
- 已取消節點引用指向自身,避免垃圾保留和記憶體損耗
- 通過自旋和 LockSupport 的 park/unpark 實現阻塞,在高爭用環境下,自旋可以顯著提高吞吐量。
資料結構
SynchronousQueue 繼承關係
SQ 有三個內部類:
- Transferer:內部抽象類,只有一個
transfer
方法。SQ的put
和take
transfer
方法),因為在雙重佇列/棧資料結構中,put
和take
操作是對稱的,所以幾乎所有程式碼都可以合併。 - TransferStack:繼承了內部抽象類 Transferer,實現了
transfer
方法,用於非公平模式下的佇列操作,資料按照LIFO的順序。內部通過單向連結串列 SNode 實現的雙重棧。 - TransferQueue:繼承了內部抽象類 Transferer,實現了
transfer
方法,用於公平模式下的佇列操作,資料按照FIFO的順序。內部通過單向連結串列 QNode 實現的雙重佇列。
SNode & QNode
SNode 是雙重棧的實現,內部除了基礎的連結串列指標和資料外,還維護了一個int
型變數mode
,它是實現雙重棧的關鍵欄位,有三個取值:0代表消費者節點(take),1代表生產者節點(put),2 | mode
(mode為當前操作者模式:put or take)代表節點已被匹配。此外還有一個match
引用,用於匹配時標識匹配的節點,節點取消等待後match
引用指向自身。
原始碼解析
SQ 的 put/take 操作完全是由transfer
方法實現,以put
方法為例,
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
可以看到呼叫了內部變數 transferer 的transfer
的方法。其它例如offer、take、poll
都與之類似,所以接下來我們主要針對transfer
方法,來分析 SQ 公平模式和非公平模式的不同實現。
TransferStack.transfer()
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
//根據所傳元素判斷為生產or消費
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())//head已經被匹配,修改head繼續迴圈
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {//構建新的節點s,放到棧頂
//等待s節點被匹配,返回s.match節點m
SNode m = awaitFulfill(s, timed, nanos);
//s.match==s(等待被取消)
if (m == s) { // wait was cancelled
clean(s);//清除s節點
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { //head節點還沒有被匹配,嘗試匹配 try to fulfill
if (h.isCancelled()) // already cancelled
//head已經被匹配,修改head繼續迴圈
casHead(h, h.next); // pop and retry
//構建新節點,放到棧頂
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
//cas成功後s的match節點就是s.next,即m
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {//嘗試匹配,喚醒m節點的執行緒
casHead(s, mn); //彈出匹配成功的兩個節點,替換head pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); //匹配失敗,刪除m節點,重新迴圈 help unlink
}
}
} else { //頭節點正在匹配 help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {//幫助頭節點匹配
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
說明:基本演算法是迴圈嘗試以下三種行為之一:
-
如果棧為空或者已經包含了一個相同的 mode,此時分兩種情況:如果是非計時操作(
offer、poll
)或者已經超時,直接返回null;其他情況下就把當前節點壓進棧頂等待匹配(通過awaitFulfill
方法),匹配成功後返回匹配節點的 item,如果節點取消等待就呼叫clean
方法(後面單獨講解)清除取消等待的節點,並返回 null。 -
如果棧頂節點(
head
)還沒有被匹配(通過isFulfilling
方法判斷),則把當前節點壓入棧頂,並嘗試與head
節點進行匹配,匹配成功後從棧中彈出這兩個節點,並返回匹配節點的資料。isFulfilling
原始碼如下:
/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
- 如果棧頂節點(
head
)已經持有另外一個數據節點,說明棧頂節點正在匹配,則幫助此節點進行匹配操作,然後繼續從第一步開始迴圈。
TransferStack. awaitFulfill()
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
//計算截止時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//計算自旋次數
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())//當前執行緒被中斷
//取消對給定節點s的匹配節點的等待
s.tryCancel();
SNode m = s.match;//獲取給定節點s的match節點
if (m != null)//已經匹配到,返回匹配節點
return m;
if (timed) {
//超時處理
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();//超時,取消s節點的匹配,match指向自身
continue;
}
}
if (spins > 0)
//spins-1
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
//設定給定節點s的waiter為當前執行緒
s.waiter = w; // establish waiter so can park next iter
else if (!timed)//沒有設定超時,直接阻塞
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)//阻塞指定超時時間
LockSupport.parkNanos(this, nanos);
}
}
說明:如果當前操作是一個不計時操作,或者是一個還未到超時時間的操作,就構建新的節點壓入棧頂。然後呼叫此方法自旋/阻塞等待給定節點s
被匹配。
當呼叫此方法時,所傳引數節點s
一定是在棧頂,節點真正阻塞前會先自旋,以防生產者和消費者到達的時間點非常接近時也被 park。
當節點/執行緒需要阻塞時,首先設定waiter
欄位為當前執行緒,然後在真正阻塞之前重新檢查一下waiter
的狀態,因為線上程競爭中,需要確認waiter
沒有被其他執行緒佔用。
從主迴圈返回的檢查順序可以反映出中斷優先於正常返回。除了不計時操作(poll/offer
)不會檢查中斷,而是直接在transfer
方法中入棧等待匹配。
TransferQueue.transfer()
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);//判斷put or take
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { //尾節點滯後,更新尾節點 lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
//為當前操作構造新節點,並放到隊尾
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
//推進tail
advanceTail(t, s); // swing tail and wait
//等待匹配,並返回匹配節點的item,如果取消等待則返回該節點s
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s); //等待被取消,清除s節點
return null;
}
if (!s.isOffList()) { // s節點尚未出列 not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;//item指向自身
s.waiter = null;
}
return (x != null) ? (E)x : e;
//take
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || //m.item=m, m cancelled
!m.casItem(x, e)) { // 匹配,CAS修改item為給定元素e lost CAS
advanceHead(h, m); // 推進head,繼續向後查詢 dequeue and retry
continue;
}
advanceHead(h, m); //匹配成功,head出列 successfully fulfilled
LockSupport.unpark(m.waiter); //喚醒被匹配節點m的執行緒
return (x != null) ? (E)x : e;
}
}
}
說明:基本演算法是迴圈嘗試以下兩個動作中的其中一個:
-
若佇列為空或者佇列中的尾節點(
tail
)和自己的模式相同,則把當前節點新增到佇列尾,呼叫awaitFulfill
等待節點被匹配。匹配成功後返回匹配節點的 item,如果等待節點被中斷或等待超時返回null
。在此期間會不斷檢查tail
節點,如果tail
節點被其他執行緒修改,則向後推進tail
繼續迴圈嘗試。注:TransferQueue 的awaitFulfill
方法與TransferStack.awaitFulfill
演算法一致,後面就不再講解了。 -
如果當前操作模式與尾節點(
tail
)不同,說明可以進行匹配,則從佇列頭節點head
開始向後查詢一個互補節點進行匹配,嘗試通過CAS
修改互補節點的item
欄位為給定元素e
,匹配成功後向後推進head
,並喚醒被匹配節點的waiter
執行緒,最後返回匹配節點的item
。
棧/佇列節點清除的對比(clean方法)
在佇列和棧中進行清理的方式不同: 對於佇列來說,如果節點被取消,我們幾乎總是可以以 O1 的時間複雜度移除節點。但是如果節點在隊尾,它必須等待後面節點的取消。 對於棧來說,我們可能需要 O(n) 的時間複雜度去遍歷整個棧,然後確定節點可被移除,但這可以與訪問棧的其他執行緒並行執行。
下面我們來看一下 TransferStack 和 TransferQueue 對節點清除方法的優化:
TransferStack.clean(SNode s)
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
//找到有效head
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes
//移除head到past中已取消節點的連結
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
說明:在最壞的情況下可能需要遍歷整個棧來解除給定節點s
的連結(例如給定節點在棧底)。在併發情況下,如果有其他執行緒已經移除給定節點s
,當前執行緒可能無法看到,但是我們可以使用這樣一種演算法:
使用s.next
作為past
節點,如果past
節點已經取消,則使用past.next
節點,然後依次解除從head
到past
中已取消節點的連結。在這裡不會做更深的檢查,因為為了找到失效節點而進行兩次遍歷是不值得的。
TransferQueue.clean(QNode pred, QNode s)
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
//找到有效head節點
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)//佇列為空,直接返回
return;
QNode tn = t.next;
if (t != tail)//tail節點被其他執行緒修改,重新迴圈
continue;
//找到tail節點
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))//cas解除s的連結
return;
}
//s是佇列尾節點,此時無法刪除s,只能去清除cleanMe節點
QNode dp = cleanMe;
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))//原cleanMe為空,標記pred為cleanMe,延遲清除s節點
return; // Postpone cleaning s
}
}
說明:方法引數中s
為已經取消的節點,pred
為s
的前繼節點。
任何時候在佇列中都存在一個不能刪除的節點,也就是最後被插入的那個節點(tail
節點)。為了滿足這一點,在 TransferQueue 中維護了一個cleanMe
節點引用。當給定s
節點為tail
節點時,首先刪除cleanMe
節點引用;然後儲存s
的前繼節點作為cleanMe
節點,在下次清除操作時再清除節點。這樣保證了在s
節點和cleanMe
節點中至少有一個是可以刪除的。
小結
本章重點:理解 SynchronousQueue 中雙重棧和雙重佇列的實現;理解 SynchronousQueue 的阻塞演算法。
作者:泰迪的bagwell 連結:https://www.jianshu.com/p/c4855acb57ec 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。