1. 程式人生 > >JDK併發工具類原始碼學習系列——SynchronousQueue

JDK併發工具類原始碼學習系列——SynchronousQueue

SynchronousQueue是一種特殊的阻塞佇列,不同於LinkedBlockingQueue、ArrayBlockingQueue和PriorityBlockingQueue,其內部沒有任何容量,任何的入隊操作都需要等待其他執行緒的出隊操作,反之亦然。如果將SynchronousQueue用於生產者/消費者模式,那麼相當於生產者和消費者手遞手交易,即生產者生產出一個貨物,則必須等到消費者過來取貨,方可完成交易。
SynchronousQueue有一個fair選項,如果fair為true,稱為fair模式,否則就是unfair模式。fair模式使用一個先進先出的佇列儲存生產者或者消費者執行緒,unfair模式則使用一個後進先出的棧儲存。

基本原理

SynchronousQueue通過將入隊出隊的執行緒繫結到佇列的節點上,並藉助LockSupport的park()和unpark()實現等待,先到達的執行緒A需呼叫LockSupport的park()方法將當前執行緒進入阻塞狀態,知道另一個與之匹配的執行緒B呼叫LockSupport.unpark(Thread)來喚醒在該節點上等待的執行緒A。
基本邏輯:

  1. 初始狀態佇列為null
  2. 當一個執行緒到達,如果佇列為null,無與之匹配的執行緒,則進入佇列等待;佇列不為null,參考3
  3. 當另一個執行緒到達,如果佇列不為null,則判斷佇列中的第一個元素(針對fair和unfair不同)是否與其匹配,如果匹配則完成交易,不匹配則也入隊;佇列為null,參考2

常用方法解析

在深入分析其實現機制之前,我們先了解對於SynchronousQueue可執行哪些操作,由於SynchronousQueue的容量為0,所以一些針對集合的操作,如:isEmpty()/size()/clear()/remove(Object)/contains(Object)等操作都是無意義的,同樣peek()也總是返回null。所以針對SynchronousQueue只有兩類操作:

  • 入隊(put(E)/offer(E, long, TimeUnit)/offer(E))
  • 出隊(take()/poll(long, TimeUnit)/poll())

這兩類操作內部都是呼叫Transferer的transfer(Object, boolean, long)方法,通過第一個引數是否為null,來區分是生產者還是消費者(生產者不為null)。
針對以上情況,我們將著重分析Transferer的transfer(Object, boolean, long)方法,這裡由於兩種不同的公平模式,會存在兩個Transferer的派生類:

public SynchronousQueue(boolean fair) {
    transferer = (fair)? new TransferQueue() : new TransferStack();
}

可見fair模式使用TransferQueue,unfair模式使用TransferStack,下面我們將分別對這兩種模式進行著重分析。

fair模式

fair模式使用一個FIFO的佇列儲存執行緒,TransferQueue的結構如下:

/** Dual Queue */
static final class TransferQueue extends Transferer {
    /** Node class for TransferQueue. */
    static final class QNode {
        volatile QNode next;          // next node in queue
        volatile Object item;         // CAS'ed to or from null
        volatile Thread waiter;       // to control park/unpark
        final boolean isData;

        QNode(Object item, boolean isData) {
            this.item = item;
            this.isData = isData;
        }

        ...
    }

    /** Head of queue */
    transient volatile QNode head;
    /** Tail of queue */
    transient volatile QNode tail;
    /**
     * Reference to a cancelled node that might not yet have been
     * unlinked from queue because it was the last inserted node
     * when it cancelled.
     */
    transient volatile QNode cleanMe;

    TransferQueue() {
        QNode h = new QNode(null, false); // initialize to dummy node.
        head = h;
        tail = h;
    }

    ...
}

以上是TransferQueue的大致結構,可以看到TransferQueue同一個普通的佇列,同時存在一個指向佇列頭部的指標——head,和一個指向佇列尾部的指標——tail;cleanMe的存在主要是解決不可清楚佇列的尾節點的問題,後面會介紹到;佇列的節點通過內部類QNode封裝,QNode包含四個變數:

  • next:指向佇列中的下一個節點
  • item:節點包含的資料
  • waiter:等待在該節點上的執行緒
  • isData:表示該節點由生產者建立還是由消費者建立,由於生產者是放入資料,所以isData==true,而消費者==false

其他的內容就是一些CAS變數以及操作,下面主要分析TransferQueue的三個重要方法:transfer(Object, boolean, long)、awaitFulfill(QNode, Object, boolean, long)、clean(QNode, QNode)。這三個方法是TransferQueue的核心,入口是transfer(),下面具體看程式碼。

transfer

/**
 * @By Vicky:交換資料,生產者和消費者通過e==null來區分
 */
Object transfer(Object e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);// e==null,則isData==false,else idData==true

    for (;;) {// 迴圈
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // 無視即可,具體資訊在方法開始的註釋中有提到
            continue;                       // spin

        // h==t佇列為null,tail的isData==isData表示該佇列中的等待的執行緒與當前執行緒是相同模式
        //(同為生產者,或者同為消費者)(佇列中只存在一種模式的執行緒)
        // 此時需要將該執行緒插入到佇列中進行等待
        if (h == t || t.isData == isData) { 
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            // 這裡的目的是為了幫助其他執行緒完成入隊操作
            if (tn != null) {               // lagging tail
                // 原子性將tail從t更新為tn,即將tail往後移動,直到佇列的最後一個元素
                advanceTail(t, tn);
                continue;
            }
            // 如果nanos<=0則說明不等待,那麼到這裡已經說到佇列沒有可匹配的執行緒,所以直接返回null即可
            if (timed && nanos <= 0)        // can't wait
                return null;
            // 僅初始化一次s,節點s會儲存isData資訊作為生產者和消費者的區分
            if (s == null)
                s = new QNode(e, isData);
            // 原子性的更新t的next指標指向s,上面將tail從t更新為tn就是為了處理此處剩下的操作
            // 由於此處插入一個節點分成了兩個步驟,所以過程中會插入其他執行緒,導致看到不一致狀態
            // 所以其他執行緒會執行剩下的步驟幫助其完成入隊操作
            if (!t.casNext(null, s))        // failed to link in
                continue;

            // 如果自己執行失敗沒有關係,會有其他執行緒幫忙執行完成的,所以才無需鎖,類似ConcurrentLinkedQueue
            advanceTail(t, s);              // swing tail and wait
            // 等待匹配
            Object x = awaitFulfill(s, e, timed, nanos);
            // 這裡有兩種情況:
            //  A:匹配完成,返回資料
            //  B:等待超時/取消,返回原節點s
            if (x == s) {                   // wait was cancelled
                // 情況B則需要清除掉節點s
                clean(t, s);
                return null;
            }

            // 情況A,則匹配成功了,但是還需要將該節點從佇列中移除
            //  由於FIFO原則,所以匹配上的元素必然是佇列的第一個元素,所以只需要移動head即可
            if (!s.isOffList()) {           // not already unlinked
                // 移動head指向s,則下次匹配從s.next開始
                advanceHead(t, s);          // unlink if head
                // 清除對節點中儲存的資料的引用,GC友好
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null)? x : e;

        } else {                            // complementary-mode
            // 進行匹配,從佇列的頭部開始,即head.next,非head
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            // 判斷該節點的isData是否與當前執行緒的isData匹配
            // 相等則說明m已經匹配過了,因為正常情況是不相等才對
            // x==m說明m被取消了,見QNode的tryCancel()方法
            // CAS設定m.item為e,這裡的e,如果是生產者則是資料,消費者則是null,
            // 所以m如果是生產者,則item變為null,消費者則變為生產者的資料
            // CAS操作失敗,則直接將m出隊,CAS失敗說明m已經被其他執行緒匹配了,所以將其出隊,然後retry
            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }
            // 與m匹配成功,將m出隊,並喚醒等待在m上的執行緒m.waiter
            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null)? x : e;
        }
    }
}

從上面的程式碼可以看出TransferQueue.transfer()的整體流程:

  1. 判斷當前佇列是否為null或者隊尾執行緒是否與當前執行緒匹配,為null或者不匹配都將進行入隊操作
  2. 入隊主要很簡單,分成兩步:修改tail的next為新的節點,修改tail為新的節點,這兩步操作有可能分在兩個不同的執行緒執行,不過不影響執行結果
  3. 入隊之後需要將當前執行緒阻塞,呼叫LockSupport.park()方法,直到打斷/超時/被匹配的執行緒喚醒
  4. 如果被取消,則需要呼叫clean()方法進行清除
  5. 由於FIFO,所以匹配總是發生在佇列的頭部,匹配將修改等待節點的item屬性傳遞資料,同時喚醒等待在節點上的執行緒

awaitFulfill

下面看看具體如何讓一個執行緒進入阻塞。

/**
 *@ By Vicky:等待匹配,該方法會進入阻塞,直到三種情況下才返回:
 *  a.等待被取消了,返回值為s
 *  b.匹配上了,返回另一個執行緒傳過來的值
 *  c.執行緒被打斷,會取消,返回值為s
 */
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
    // timed==false,則不等待,lastTime==0即可
    long lastTime = (timed)? System.nanoTime() : 0;
    // 當前執行緒
    Thread w = Thread.currentThread();
    // 迴圈次數,原理同自旋鎖,如果不是佇列的第一個元素則不自旋,因為壓根輪不上他,自旋只是浪費CPU
    // 如果等待的話則自旋的次數少些,不等待就多些
    int spins = ((head.next == s) ?
                 (timed? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())// 支援打斷
            s.tryCancel(e);
        // 如果s的item不等於e,有三種情況:
        // a.等待被取消了,此時x==s
        // b.匹配上了,此時x==另一個執行緒傳過來的值
        // c.執行緒被打斷,會取消,此時x==s
        // 不管是哪種情況都不要再等待了,返回即可
        Object x = s.item;
        if (x != e)
            return x;
        // 等到,直接超時取消
        if (timed) {
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel(e);
                continue;
            }
        }
        // 自旋,直到spins==0,進入等待
        if (spins > 0)
            --spins;
        // 設定等待執行緒
        else if (s.waiter == null)
            s.waiter = w;
        // 呼叫LockSupport.park進入等待
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

awaitFulfill()主要涉及自旋以及LockSupport.park()兩個關鍵點,自旋可去了解自旋鎖的原理。

自旋鎖原理:通過空迴圈則霸佔著CPU,避免當前執行緒進入睡眠,因為睡眠/喚醒是需要進行執行緒上下文切換的,所以如果執行緒睡眠的時間很段,那麼使用空迴圈能夠避免執行緒進入睡眠的耗時,從而快速響應。但是由於空迴圈會浪費CPU,所以也不能一直迴圈。自旋鎖一般適合同步快很小,競爭不是很激烈的場景。

LockSupport.park()可到API文件進行了解。

clean

下面再看看如何清除被取消的節點。

/**
 *@By Vicky:清除節點被取消的節點 
 */
void clean(QNode pred, QNode s) {
    s.waiter = null; // forget thread
    // 如果pred.next!=s則說明s已經出隊了
    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;
        QNode hn = h.next;   // Absorb cancelled first node as head
        // 從佇列頭部開始遍歷,遇到被取消的節點則將其出隊 
        if (hn != null && hn.isCancelled()) {
            advanceHead(h, hn);
            continue;
        }
        QNode t = tail;      // Ensure consistent read for tail
        // t==h則佇列為null
        if (t == h)
            return;
        QNode tn = t.next;
        if (t != tail)
            continue;
        // 幫助其他執行緒入隊
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }
        // 只能出隊非尾節點
        if (s != t) {        // If not tail, try to unsplice
            // 出隊方式很簡單,將pred.next指向s.next即可
            QNode sn = s.next;
            if (sn == s || pred.casNext(s, sn))
                return;
        }
        // 如果s是隊尾元素,那麼就需要cleanMe出場了,如果cleanMe==null,則只需將pred賦值給cleanMe即可,
        // 賦值cleanMe的意思是等到s不是隊尾時再進行清除,畢竟隊尾只有一個
        // 同時將上次的cleanMe清除掉,正常情況下此時的cleanMe已經不是隊尾了,因為當前需要清除的節點是隊尾
        // (上面說的cleanMe其實是需要清除的節點的前繼節點)
        QNode dp = cleanMe;
        if (dp != null) {    // Try unlinking previous cancelled node
            QNode d = dp.next;
            QNode dn;
            // d==null說明需要清除的節點已經沒了
            // d==dp說明dp已經被清除了,那麼dp.next也一併被清除了
            // 如果d未被取消,說明哪裡出錯了,將cleanMe清除,不清除這個節點了
            // 後面括號將清除cleanMe的next出局,前提是cleanMe.next沒有已經被出局
            if (d == null ||               // d is gone or
                d == dp ||                 // d is off list or
                !d.isCancelled() ||        // d not cancelled or
                (d != t &&                 // d not tail and
                 (dn = d.next) != null &&  //   has successor
                 dn != d &&                //   that is on list
                 dp.casNext(d, dn)))       // d unspliced
                casCleanMe(dp, null);
            // dp==pred說明cleanMe.next已經其他執行緒被更新了
            if (dp == pred)
                return;      // s is already saved node
        } else if (casCleanMe(null, pred))
            return;          // Postpone cleaning s
    }
}

清除節點時有個原則:不能清除隊尾節點。所以如果對尾節點需要被清除,則將其儲存到cleanMe變數,等待下次進行清除。在清除cleanMe時可能說的有點模糊,因為涉及到太多的併發會出現很多情況,所以if條件太多,導致難以分析全部情況。

以上就是TransferQueue的操作邏輯,下面看看後進先出的TransferStack。

unfair模式

unfair模式使用一個LIFO的佇列儲存執行緒,TransferStack的結構如下:

/** Dual stack */
static final class TransferStack extends Transferer {
    /* Modes for SNodes, ORed together in node fields */
    /** Node represents an unfulfilled consumer */
    static final int REQUEST    = 0;// 消費者請求資料
    /** Node represents an unfulfilled producer */
    static final int DATA       = 1;// 生產者生產資料
    /** Node is fulfilling another unfulfilled DATA or REQUEST */
    static final int FULFILLING = 2;// 正在匹配中...

    /** 只需要判斷mode的第二位是否==1即可,==1則正在匹配中...*/
    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

    /** Node class for TransferStacks. */
    static final class SNode {
        volatile SNode next;        // next node in stack
        volatile SNode match;       // the node matched to this
        volatile Thread waiter;     // to control park/unpark
        Object item;                // data; or null for REQUESTs
        int mode;
        // Note: item and mode fields don't need to be volatile
        // since they are always written before, and read after,
        // other volatile/atomic operations.

        SNode(Object item) {
            this.item = item;
        }
    }

    /** The head (top) of the stack */
    volatile SNode head;

    static SNode snode(SNode s, Object e, SNode next, int mode) {
        if (s == null) s = new SNode(e);
        s.mode = mode;
        s.next = next;
        return s;
    }
}

TransferStacks比TransferQueue的結構複雜些。使用一個head指向棧頂元素,使用內部類SNode封裝棧中的節點資訊,SNode包含5個變數:

  • next:指向棧中下一個節點
  • match:與之匹配的節點
  • waiter:等待的執行緒
  • item:資料
  • mode:模式,對應REQUEST/DATA/FULFILLING(第三個並不是FULFILLING,而是FULFILLING | REQUEST或者FULFILLING | DATA)

SNode的5個變數,三個是volatile的,另外兩個item和mode沒有volatile修飾,程式碼註釋給出的解釋是:對這兩個變數的寫總是發生在volatile/原子操作的之前,讀總是發生在volatile/原子操作的之後。

上面提到SNode.mode的三個常量表示棧中節點的狀態,f分別為:

  • REQUEST:0,消費者的請求生成的節點
  • DATA:1,生產者的請求生成的節點
  • FULFILLING:2,正在匹配中的節點,具體對應的mode值是FULFILLING | REQUEST和FULFILLING | DATA

其他內部基本同TransferQueue,不同之處是當匹配到一個節點時並非是將被匹配的節點出棧,而是將匹配的節點入棧,然後同時將匹配上的兩個節點一起出棧。下面我們參照TransferQueue來看看TransferStacks的三個方法:transfer(Object, boolean, long)、awaitFulfill(QNode, Object, boolean, long)、clean(QNode, QNode)。

transfer

/**
 * @By Vicky:交換資料,生產者和消費者通過e==null來區分
 */
Object transfer(Object e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    int mode = (e == null)? REQUEST : DATA;// 根據e==null判斷生產者還是消費者,對應不同的mode值

    for (;;) {
        SNode h = head;
        // 棧為null或者棧頂元素的模式同當前模式,則進行入棧操作
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 不等待,則直接返回null,返回之前順帶清理下被取消的元素
            if (timed && nanos <= 0) {      // can't wait
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {// 入棧,更新棧頂為新節點
                // 等待,返回值m==s,則被取消,需清除
                SNode m = awaitFulfill(s, timed, nanos);
                // m==s說明s被取消了,清除
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                // 幫忙出棧
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                // 消費者則返回生產者的資料,生產者則返回自己的資料
                return mode == REQUEST? m.item : s.item;
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill   // 棧頂未開始匹配,則開始匹配
            // h被取消,則出棧
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            // 更新棧頂為新插入的節點,並更新節點的mode為FULFILLING,對應判斷是否正在出棧的方法
            // 匹配需要先將待匹配的節點入棧,所以不管是匹配還是不匹配都需要建立一個節點入棧
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 迴圈直到找到一個可以匹配的節點
                for (;;) { // loop until matched or waiters disappear
                    // m即與s匹配的節點
                    SNode m = s.next;       // m is s's match
                    // m==null說明棧s之後無元素了,直接將棧頂設定為null,並重新進行最外層的迴圈
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    // 將s設定為m的匹配節點,並更新棧頂為m.next,即將s和m同時出棧
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (mode == REQUEST)? m.item : s.item;
                    } else                  // lost match
                        // 設定匹配失敗,則說明m正準備出棧,幫助出棧
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller // 棧頂已開始匹配,幫助匹配
            // 此處的操作邏輯同上面的操作邏輯一致,目的就是幫助上面進行操作,因為此處完成匹配需要分成兩步:
            // a.m.tryMatch(s)和b.casHead(s, mn)
            // 所以必然會插入其他執行緒,只要插入的執行緒也按照這個步驟執行那麼就避免了不一致問題
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

從上面的程式碼可以看出TransferStack.transfer()的整體流程:

  1. 判斷當前棧是否為null或者棧頂執行緒是否與當前執行緒匹配,為null或者不匹配都將進行入棧操作
  2. 入棧主要很簡單,分成兩步:插入一個節點入棧,該步無需同步,第二步需要head指標指向新節點,該步通過CAS保證安全
  3. 入棧之後需要將當前執行緒阻塞,呼叫LockSupport.park()方法,直到打斷/超時/被匹配的執行緒喚醒
  4. 如果被取消,則需要呼叫clean()方法進行清除
  5. 由於LIFO,所以匹配的節點總是棧頂的兩個節點,分成兩步:原子性更新節點的match變數,更新head。由於兩步無法保證原子性,所以通過將棧頂元素的mode更新為FULFILLING,阻止其他執行緒在棧頂發生匹配時進行其他操作,同時其他執行緒需幫助棧頂進行的匹配操作

awaitFulfill

下面看看TransferStack是如何讓一個執行緒進入阻塞。

/**
 *@ By Vicky:等待匹配,邏輯大致同TransferQueue可參考閱讀
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    long lastTime = (timed)? System.nanoTime() : 0;
    Thread w = Thread.currentThread();
    SNode h = head;
    // 計算自旋的次數,邏輯大致同TransferQueue
    int spins = (shouldSpin(s)?
                 (timed? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel();
        // 如果s的match不等於null,有三種情況:
        // a.等待被取消了,此時x==s
        // b.匹配上了,此時match==另一個節點
        // c.執行緒被打斷,會取消,此時x==s
        // 不管是哪種情況都不要再等待了,返回即可
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            // 等待
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel();
                continue;
            }
        }
        // 自旋
        if (spins > 0)
            spins = shouldSpin(s)? (spins-1) : 0;
        // 設定等待執行緒
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        // 等待
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

邏輯基本同TransferQueue,不同之處是通過修改SNode的match變數標示匹配,以及取消。

clean

下面再看看如何清除被取消的節點。

/**
 * @By Vicky:清除節點
 */
void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread
    // 清除
    SNode past = s.next;
    if (past != null && past.isCancelled())
        past = past.next;

    // Absorb cancelled nodes at head
    // 從棧頂節點開始清除,一直到遇到未被取消的節點,或者直到s.next
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        casHead(p, p.next);

    // Unsplice embedded nodes
    // 如果p本身未取消(上面的while碰到一個未取消的節點就會退出,但這個節點和past節點之間可能還有取消節點),
    // 再把p到past之間的取消節點都移除。
    while (p != null && p != past) {
        SNode n = p.next;
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            p = n;
    }
}

以上即全部的TransferStack的操作邏輯。

看完了TransferQueue和TransferStack的邏輯,SynchronousQueue的邏輯基本清楚了。

應用場景

SynchronousQueue的應用場景得看具體業務需求,J.U.C下有一個應用案例:Executors.newCachedThreadPool()就是使用SynchronousQueue作為任務佇列。

參考文章

相關推薦

JDK併發工具原始碼學習系列——SynchronousQueue

SynchronousQueue是一種特殊的阻塞佇列,不同於LinkedBlockingQueue、ArrayBlockingQueue和PriorityBlockingQueue,其內部沒有任何容量,任何的入隊操作都需要等待其他執行緒的出隊操作,反之亦然。如果

JDK併發工具原始碼學習系列——LinkedBlockingQueue

LinkedBlockingQueue是一個基於已連結節點的、範圍任意的 blocking queue。此佇列按 FIFO(先進先出)排序元素。佇列的頭部 是在佇列中時間最長的元素。佇列的尾部 是在佇列中時間最短的元素。新元素插入到佇列的尾部,並且佇列獲取操作會

JDK併發工具原始碼學習系列——ConcurrentSkipListMap(續)

上一篇介紹了ConcurrentSkipListMap的原理以及對put()方法進行了解析,本篇繼續接著上一篇往下看。 上一篇說到put()的最後一句:insertIndex(z, level);這一句是將一個新節點插入到跳錶結構中,下面看看是如何實現的。

java併發_Thread原始碼學習

一、建立執行緒 Thread在使用時,一種方式是構造一個Thread的子類,通過覆蓋run方法實現。 Thread t = new Thread() { @Override public void run() {

Java併發原始碼學習系列:掛起與喚醒執行緒LockSupport工具

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源

Java併發原始碼學習系列:阻塞佇列實現之SynchronousQueue原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源

併發程式設計系列:4大併發工具的功能、原理、以及應用場景

通常我們所說的併發包也就是java.util.concurrent,集中了Java併發工具類和併發容器等,今天主要介紹Java併發程式設計的工具類,我先從Java併發工具包談起。 01 — 併發工具包涵蓋範圍 1.併發工具類 提供了比synchronized更加高階

多執行緒學習筆記六之併發工具CountDownLatch和CyclicBarrier

目錄 簡介 CountDownLatch 示例 實現分析 CountDownLatch與Thread.join() CyclicBarrier 實現分析 CountDownLatch和CyclicBarrier區別 簡介

多執行緒系列五:併發工具併發容器

一、併發容器 1.ConcurrentHashMap 為什麼要使用ConcurrentHashMap 在多執行緒環境下

Java併發原始碼學習系列:AbstractQueuedSynchronizer

[toc] > 本文基於JDK1.8 ## 本篇學習目標 - 瞭解AQS的設計思想以及重要欄位含義,如通過state欄位表示同步狀態等。 - 瞭解AQS內部維護鏈式雙向同步佇列的結構以及幾個重要指標。 - 瞭解五種重要的同步狀態。 - 明確兩種模式:共享模式和獨佔模式。 - 學習兩種模式下AQS提供的模

Java併發原始碼學習系列:CLH同步佇列及同步資源獲取與釋放

[toc] ## 本篇學習目標 - 回顧CLH同步佇列的結構。 - 學習獨佔式資源獲取和釋放的流程。 ## CLH佇列的結構 我在[Java併發包原始碼學習系列:AbstractQueuedSynchronizer#同步佇列與Node節點](https://www.cnblogs.com/summer

Java併發原始碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別

[toc] # Java併發包原始碼學習系列:AQS共享模式獲取與釋放資源 往期回顧: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://www.cnblogs.com/summerday152/p/14238284.html) - [Java併

Java併發原始碼學習系列:JDK1.8的ConcurrentHashMap原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源

Java併發原始碼學習系列:阻塞佇列BlockingQueue及實現原理分析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源

Java併發原始碼學習系列:阻塞佇列實現之ArrayBlockingQueue原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源

Java併發原始碼學習系列:阻塞佇列實現之LinkedBlockingQueue原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源

Java併發原始碼學習系列:阻塞佇列實現之PriorityBlockingQueue原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源

Java併發原始碼學習系列:阻塞佇列實現之DelayQueue原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源

Java併發原始碼學習系列:阻塞佇列實現之LinkedTransferQueue原始碼解析

[toc] 系列傳送門: Java併發包原始碼學習系列:AbstractQueuedSynchronizer Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放 Java併發包原始碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別 Java併發包原始碼學習系列:ReentrantLock可重

Java併發原始碼學習系列:基於CAS非阻塞併發佇列ConcurrentLinkedQueue原始碼解析

[toc] ## 非阻塞併發佇列ConcurrentLinkedQueue概述 我們之前花了很多時間瞭解學習BlockingQueue阻塞佇列介面下的各種實現,也大概對阻塞佇列的實現機制有了一定的瞭解:阻塞 + 佇列嘛。 而且其中絕大部分是完全基於獨佔鎖ReentrantLock和條件機制conditi