1. 程式人生 > >JUC原始碼分析-集合篇(八):SynchronousQueue

JUC原始碼分析-集合篇(八):SynchronousQueue

SynchronousQueue 是一個同步阻塞佇列,它的每個插入操作都要等待其他執行緒相應的移除操作,反之亦然。SynchronousQueue 像是生產者和消費者的會合通道,它比較適合“切換”或“傳遞”這種場景:一個執行緒必須同步等待另外一個執行緒把相關資訊/時間/任務傳遞給它。在之後的執行緒池原始碼分析中我們也會見到它,所以理解本章對我們之後的執行緒池講解也會有很大幫助。

概述

SynchronousQueue(後面稱SQ)內部沒有容量,所以不能通過peek方法獲取頭部元素;也不能單獨插入元素,可以簡單理解為它的插入和移除是“一對”對稱的操作。為了相容 Collection 的某些操作(例如contains

),SQ 扮演了一個空集合的角色。 SQ 的一個典型應用場景是線上程池中,Executors.newCachedThreadPool() 就使用了它,這個構造使執行緒池根據需要(新任務到來時)建立新的執行緒,如果有空閒執行緒則會重複使用,執行緒空閒了60秒後會被回收。

SQ 為等待過程中的生產者或消費者執行緒提供可選的公平策略(預設非公平模式)。非公平模式通過棧(LIFO)實現,公平模式通過佇列(FIFO)實現。使用的資料結構是雙重佇列(Dual queue)雙重棧(Dual stack)(後面詳細講解)。FIFO通常用於支援更高的吞吐量,LIFO則支援更高的執行緒區域性儲存(TLS)。

SQ 的阻塞演算法可以歸結為以下幾點:

  • 使用了雙重佇列(Dual queue)雙重棧(Dual stack)儲存資料,佇列中的每個節點都可以是一個生產者或是消費者。(有關雙重佇列,請參考筆者另外一篇文章:JUC原始碼分析-集合篇(四):LinkedTransferQueue,雙重棧與它原理一致)
  • 已取消節點引用指向自身,避免垃圾保留和記憶體損耗
  • 通過自旋和 LockSupport 的 park/unpark 實現阻塞,在高爭用環境下,自旋可以顯著提高吞吐量。

資料結構

SynchronousQueue 繼承關係

SQ 有三個內部類:

  1. Transferer:內部抽象類,只有一個transfer方法。SQ的puttake
    被統一為一個方法(就是這個transfer方法),因為在雙重佇列/棧資料結構中,puttake操作是對稱的,所以幾乎所有程式碼都可以合併。
  2. TransferStack:繼承了內部抽象類 Transferer,實現了transfer方法,用於非公平模式下的佇列操作,資料按照LIFO的順序。內部通過單向連結串列 SNode 實現的雙重棧。
  3. TransferQueue:繼承了內部抽象類 Transferer,實現了transfer方法,用於公平模式下的佇列操作,資料按照FIFO的順序。內部通過單向連結串列 QNode 實現的雙重佇列。

SNode & QNode

SNode 是雙重棧的實現,內部除了基礎的連結串列指標和資料外,還維護了一個int型變數mode,它是實現雙重棧的關鍵欄位,有三個取值:0代表消費者節點(take),1代表生產者節點(put),2 | mode(mode為當前操作者模式:put or take)代表節點已被匹配。此外還有一個match引用,用於匹配時標識匹配的節點,節點取消等待後match引用指向自身。

原始碼解析

SQ 的 put/take 操作完全是由transfer方法實現,以put方法為例,

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

可以看到呼叫了內部變數 transferer 的transfer的方法。其它例如offer、take、poll都與之類似,所以接下來我們主要針對transfer方法,來分析 SQ 公平模式和非公平模式的不同實現。

TransferStack.transfer()

E transfer(E e, boolean timed, long nanos) {
    
    SNode s = null; // constructed/reused as needed
    //根據所傳元素判斷為生產or消費
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        if (h == null || h.mode == mode) {  // empty or same-mode
            if (timed && nanos <= 0) {      // can't wait
                if (h != null && h.isCancelled())//head已經被匹配,修改head繼續迴圈
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {//構建新的節點s,放到棧頂
                //等待s節點被匹配,返回s.match節點m
                SNode m = awaitFulfill(s, timed, nanos);
                //s.match==s(等待被取消)
                if (m == s) {               // wait was cancelled
                    clean(s);//清除s節點
                    return null;
                }
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { //head節點還沒有被匹配,嘗試匹配 try to fulfill
            if (h.isCancelled())            // already cancelled
                //head已經被匹配,修改head繼續迴圈
                casHead(h, h.next);         // pop and retry

            //構建新節點,放到棧頂
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    //cas成功後s的match節點就是s.next,即m
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;

                    if (m.tryMatch(s)) {//嘗試匹配,喚醒m節點的執行緒
                        casHead(s, mn);     //彈出匹配成功的兩個節點,替換head pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   //匹配失敗,刪除m節點,重新迴圈 help unlink
                }
            }
        } else {                            //頭節點正在匹配 help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {//幫助頭節點匹配
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

說明:基本演算法是迴圈嘗試以下三種行為之一:

  1. 如果棧為空或者已經包含了一個相同的 mode,此時分兩種情況:如果是非計時操作(offer、poll)或者已經超時,直接返回null;其他情況下就把當前節點壓進棧頂等待匹配(通過awaitFulfill方法),匹配成功後返回匹配節點的 item,如果節點取消等待就呼叫clean方法(後面單獨講解)清除取消等待的節點,並返回 null。

  2. 如果棧頂節點(head)還沒有被匹配(通過isFulfilling方法判斷),則把當前節點壓入棧頂,並嘗試與head節點進行匹配,匹配成功後從棧中彈出這兩個節點,並返回匹配節點的資料。isFulfilling原始碼如下:

/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
  1. 如果棧頂節點(head)已經持有另外一個數據節點,說明棧頂節點正在匹配,則幫助此節點進行匹配操作,然後繼續從第一步開始迴圈。

TransferStack. awaitFulfill()

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    //計算截止時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    //計算自旋次數
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())//當前執行緒被中斷
            //取消對給定節點s的匹配節點的等待
            s.tryCancel();
        SNode m = s.match;//獲取給定節點s的match節點
        if (m != null)//已經匹配到,返回匹配節點
            return m;
        if (timed) {
            //超時處理
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();//超時,取消s節點的匹配,match指向自身
                continue;
            }
        }
        if (spins > 0)
            //spins-1
            spins = shouldSpin(s) ? (spins-1) : 0;
        else if (s.waiter == null)
            //設定給定節點s的waiter為當前執行緒
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)//沒有設定超時,直接阻塞
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)//阻塞指定超時時間
            LockSupport.parkNanos(this, nanos);
    }
}

說明:如果當前操作是一個不計時操作,或者是一個還未到超時時間的操作,就構建新的節點壓入棧頂。然後呼叫此方法自旋/阻塞等待給定節點s被匹配。 當呼叫此方法時,所傳引數節點s一定是在棧頂,節點真正阻塞前會先自旋,以防生產者和消費者到達的時間點非常接近時也被 park

當節點/執行緒需要阻塞時,首先設定waiter欄位為當前執行緒,然後在真正阻塞之前重新檢查一下waiter的狀態,因為線上程競爭中,需要確認waiter沒有被其他執行緒佔用。 從主迴圈返回的檢查順序可以反映出中斷優先於正常返回。除了不計時操作(poll/offer)不會檢查中斷,而是直接在transfer方法中入棧等待匹配。

TransferQueue.transfer()

E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);//判斷put or take

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            if (tn != null) {               //尾節點滯後,更新尾節點 lagging tail
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // can't wait
                return null;
            //為當前操作構造新節點,並放到隊尾
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // failed to link in
                continue;

            //推進tail
            advanceTail(t, s);              // swing tail and wait
            //等待匹配,並返回匹配節點的item,如果取消等待則返回該節點s
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s); //等待被取消,清除s節點
                return null;
            }

            if (!s.isOffList()) {           // s節點尚未出列 not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;//item指向自身
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

            //take
        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   //m.item=m, m cancelled
                !m.casItem(x, e)) {         // 匹配,CAS修改item為給定元素e lost CAS
                advanceHead(h, m);          // 推進head,繼續向後查詢 dequeue and retry
                continue;
            }

            advanceHead(h, m);              //匹配成功,head出列 successfully fulfilled
            LockSupport.unpark(m.waiter);   //喚醒被匹配節點m的執行緒
            return (x != null) ? (E)x : e;
        }
    }
}

說明:基本演算法是迴圈嘗試以下兩個動作中的其中一個:

  1. 若佇列為空或者佇列中的尾節點(tail)和自己的模式相同,則把當前節點新增到佇列尾,呼叫awaitFulfill等待節點被匹配。匹配成功後返回匹配節點的 item,如果等待節點被中斷或等待超時返回null。在此期間會不斷檢查tail節點,如果tail節點被其他執行緒修改,則向後推進tail繼續迴圈嘗試。注:TransferQueue 的 awaitFulfill方法與 TransferStack.awaitFulfill演算法一致,後面就不再講解了。

  2. 如果當前操作模式與尾節點(tail)不同,說明可以進行匹配,則從佇列頭節點head開始向後查詢一個互補節點進行匹配,嘗試通過CAS修改互補節點的item欄位為給定元素e,匹配成功後向後推進head,並喚醒被匹配節點的waiter執行緒,最後返回匹配節點的item

棧/佇列節點清除的對比(clean方法)

在佇列和棧中進行清理的方式不同: 對於佇列來說,如果節點被取消,我們幾乎總是可以以 O1 的時間複雜度移除節點。但是如果節點在隊尾,它必須等待後面節點的取消。 對於棧來說,我們可能需要 O(n) 的時間複雜度去遍歷整個棧,然後確定節點可被移除,但這可以與訪問棧的其他執行緒並行執行。

下面我們來看一下 TransferStack 和 TransferQueue 對節點清除方法的優化:

TransferStack.clean(SNode s)

void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread

    SNode past = s.next;
    if (past != null && past.isCancelled())
        past = past.next;

    // Absorb cancelled nodes at head
    //找到有效head
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        casHead(p, p.next);

    // Unsplice embedded nodes
    //移除head到past中已取消節點的連結
    while (p != null && p != past) {
        SNode n = p.next;
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            p = n;
    }
}

說明:在最壞的情況下可能需要遍歷整個棧來解除給定節點s的連結(例如給定節點在棧底)。在併發情況下,如果有其他執行緒已經移除給定節點s,當前執行緒可能無法看到,但是我們可以使用這樣一種演算法: 使用s.next作為past節點,如果past節點已經取消,則使用past.next節點,然後依次解除從headpast中已取消節點的連結。在這裡不會做更深的檢查,因為為了找到失效節點而進行兩次遍歷是不值得的。

TransferQueue.clean(QNode pred, QNode s)

void clean(QNode pred, QNode s) {
    s.waiter = null; // forget thread
    
    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;
        QNode hn = h.next;   // Absorb cancelled first node as head
        //找到有效head節點
        if (hn != null && hn.isCancelled()) {
            advanceHead(h, hn);
            continue;
        }
        QNode t = tail;      // Ensure consistent read for tail
        if (t == h)//佇列為空,直接返回
            return;
        QNode tn = t.next;
        if (t != tail)//tail節點被其他執行緒修改,重新迴圈
            continue;
        //找到tail節點
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }
        if (s != t) {        // If not tail, try to unsplice
            QNode sn = s.next;
            if (sn == s || pred.casNext(s, sn))//cas解除s的連結
                return;
        }
        //s是佇列尾節點,此時無法刪除s,只能去清除cleanMe節點
        QNode dp = cleanMe;
        if (dp != null) {    // Try unlinking previous cancelled node
            QNode d = dp.next;
            QNode dn;
            if (d == null ||               // d is gone or
                d == dp ||                 // d is off list or
                !d.isCancelled() ||        // d not cancelled or
                (d != t &&                 // d not tail and
                 (dn = d.next) != null &&  //   has successor
                 dn != d &&                //   that is on list
                 dp.casNext(d, dn)))       // d unspliced
                casCleanMe(dp, null);
            if (dp == pred)
                return;      // s is already saved node
        } else if (casCleanMe(null, pred))//原cleanMe為空,標記pred為cleanMe,延遲清除s節點
            return;          // Postpone cleaning s
    }
}

說明:方法引數中s為已經取消的節點,preds的前繼節點。 任何時候在佇列中都存在一個不能刪除的節點,也就是最後被插入的那個節點(tail節點)。為了滿足這一點,在 TransferQueue 中維護了一個cleanMe節點引用。當給定s節點為tail節點時,首先刪除cleanMe節點引用;然後儲存s的前繼節點作為cleanMe節點,在下次清除操作時再清除節點。這樣保證了在s節點和cleanMe節點中至少有一個是可以刪除的。

小結

本章重點:理解 SynchronousQueue 中雙重棧和雙重佇列的實現;理解 SynchronousQueue 的阻塞演算法

作者:泰迪的bagwell 連結:https://www.jianshu.com/p/c4855acb57ec 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。