集合-LinkedTransferQueue原始碼解析
問題
(1)LinkedTransferQueue是什麼東東?
(2)LinkedTransferQueue是怎麼實現阻塞佇列的?
(3)LinkedTransferQueue是怎麼控制併發安全的?
(4)LinkedTransferQueue與SynchronousQueue有什麼異同?
簡介
LinkedTransferQueue是LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合體,它綜合了這三者的方法,並且提供了更加高效的實現方式。
繼承體系
LinkedTransferQueue實現了TransferQueue介面,而TransferQueue介面是繼承自BlockingQueue的,所以LinkedTransferQueue也是一個阻塞佇列。
TransferQueue介面中定義了以下幾個方法:
// 嘗試移交元素 boolean tryTransfer(E e); // 移交元素 void transfer(E e) throws InterruptedException; // 嘗試移交元素(有超時時間) boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; // 判斷是否有消費者 boolean hasWaitingConsumer(); // 檢視消費者的數量 int getWaitingConsumerCount();
主要是定義了三個移交元素的方法,有阻塞的,有不阻塞的,有超時的。
儲存結構
LinkedTransferQueue使用了一個叫做dual data structure
的資料結構,或者叫做dual queue
,譯為雙重資料結構或者雙重佇列。
雙重佇列是什麼意思呢?
放取元素使用同一個佇列,佇列中的節點具有兩種模式,一種是資料節點,一種是非資料節點。
放元素時先跟佇列頭節點對比,如果頭節點是非資料節點,就讓他們匹配,如果頭節點是資料節點,就生成一個數據節點放在佇列尾端(入隊)。
取元素時也是先跟佇列頭節點對比,如果頭節點是資料節點,就讓他們匹配,如果頭節點是非資料節點,就生成一個非資料節點放在佇列尾端(入隊)。
用圖形來表示就是下面這樣:
不管是放元素還是取元素,都先跟頭節點對比,如果二者模式不一樣就匹配它們,如果二者模式一樣,就入隊。
原始碼分析
主要屬性
// 頭節點 transient volatile Node head; // 尾節點 private transient volatile Node tail; // 放取元素的幾種方式: // 立即返回,用於非超時的poll()和tryTransfer()方法中 private static final int NOW = 0; // for untimed poll, tryTransfer // 非同步,不會阻塞,用於放元素時,因為內部使用無界單鏈表儲存元素,不會阻塞放元素的過程 private static final int ASYNC = 1; // for offer, put, add // 同步,呼叫的時候如果沒有匹配到會阻塞直到匹配到為止 private static final int SYNC = 2; // for transfer, take // 超時,用於有超時的poll()和tryTransfer()方法中 private static final int TIMED = 3; // for timed poll, tryTransfer
主要內部類
static final class Node { // 是否是資料節點(也就標識了是生產者還是消費者) final boolean isData; // false if this is a request node // 元素的值 volatile Object item; // initially non-null if isData; CASed to match // 下一個節點 volatile Node next; // 持有元素的執行緒 volatile Thread waiter; // null until waiting }
典型的單鏈表結構,內部除了儲存元素的值和下一個節點的指標外,還包含了是否為資料節點和持有元素的執行緒。
內部通過isData區分是生產者還是消費者。
主要構造方法
public LinkedTransferQueue() { } public LinkedTransferQueue(Collection<? extends E> c) { this(); addAll(c); }
只有這兩個構造方法,且沒有初始容量,所以是無界的一個阻塞佇列。
入隊
四個方法都是一樣的,使用非同步的方式呼叫xfer()方法,傳入的引數都一模一樣。
public void put(E e) { // 非同步模式,不會阻塞,不會超時 // 因為是放元素,單鏈表儲存,會一直往後加 xfer(e, true, ASYNC, 0); } public boolean offer(E e, long timeout, TimeUnit unit) { xfer(e, true, ASYNC, 0); return true; } public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; } public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; }
xfer(E e, boolean haveData, int how, long nanos)的引數分別是:
(1)e表示元素;
(2)haveData表示是否是資料節點,
(3)how表示放取元素的方式,上面提到的四種,NOW、ASYNC、SYNC、TIMED;
(4)nanos表示超時時間;
出隊
出隊的四個方法也是直接或間接的呼叫xfer()方法,放取元素的方式和超時規則略微不同,本質沒有大的區別。
public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } public E take() throws InterruptedException { // 同步模式,會阻塞直到取到元素 E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 有超時時間 E e = xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } public E poll() { // 立即返回,沒取到元素返回null return xfer(null, false, NOW, 0); }
取元素就各有各的玩法了,有同步的,有超時的,有立即返回的。
移交元素的方法
public boolean tryTransfer(E e) { // 立即返回 return xfer(e, true, NOW, 0) == null; } public void transfer(E e) throws InterruptedException { // 同步模式 if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 有超時時間 if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
請注意第二個引數,都是true,也就是這三個方法其實也是放元素的方法。
這裡xfer()方法的幾種模式到底有什麼區別呢?請看下面的分析。
神奇的xfer()方法
private E xfer(E e, boolean haveData, int how, long nanos) { // 不允許放入空元素 if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed // 外層迴圈,自旋,失敗就重試 retry: for (;;) { // restart on append race // 下面這個for迴圈用於控制匹配的過程 // 同一時刻佇列中只會儲存一種型別的節點 // 從頭節點開始嘗試匹配,如果頭節點被其它執行緒先一步匹配了 // 就再嘗試其下一個,直到匹配到為止,或者到佇列中沒有元素為止 for (Node h = head, p = h; p != null;) { // find & match first node // p節點的模式 boolean isData = p.isData; // p節點的值 Object item = p.item; // p沒有被匹配到 if (item != p && (item != null) == isData) { // unmatched // 如果兩者模式一樣,則不能匹配,跳出迴圈後嘗試入隊 if (isData == haveData) // can't match break; // 如果兩者模式不一樣,則嘗試匹配 // 把p的值設定為e(如果是取元素則e是null,如果是放元素則e是元素值) if (p.casItem(item, e)) { // match // 匹配成功 // for裡面的邏輯比較複雜,用於控制多執行緒同時放取元素時出現競爭的情況的 // 看不懂可以直接跳過 for (Node q = p; q != h;) { // 進入到這裡可能是頭節點已經被匹配,然後p會變成h的下一個節點 Node n = q.next; // update by 2 unless singleton // 如果head還沒變,就把它更新成新的節點 // 並把它刪除(forgetNext()會把它的next設為自己,也就是從單鏈表中刪除了) // 這時為什麼要把head設為n呢?因為到這裡了,肯定head本身已經被匹配掉了 // 而上面的p.casItem()又成功了,說明p也被當前這個元素給匹配掉了 // 所以需要把它們倆都出佇列,讓其它執行緒可以從真正的頭開始,不用重複檢查了 if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry // 如果新的頭節點為空,或者其next為空,或者其next未匹配,就重試 if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // 喚醒p中等待的執行緒 LockSupport.unpark(p.waiter); // 並返回匹配到的元素 return LinkedTransferQueue.<E>cast(item); } } // p已經被匹配了或者嘗試匹配的時候失敗了 // 也就是其它執行緒先一步匹配了p // 這時候又分兩種情況,p的next還沒來得及修改,p的next指向了自己 // 如果p的next已經指向了自己,就重新取head重試,否則就取其next重試 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } // 到這裡肯定是佇列中儲存的節點型別和自己一樣 // 或者佇列中沒有元素了 // 就入隊(不管放元素還是取元素都得入隊) // 入隊又分成四種情況: // NOW,立即返回,沒有匹配到立即返回,不做入隊操作 // ASYNC,非同步,元素入隊但當前執行緒不會阻塞(相當於無界LinkedBlockingQueue的元素入隊) // SYNC,同步,元素入隊後當前執行緒阻塞,等待被匹配到 // TIMED,有超時,元素入隊後等待一段時間被匹配,時間到了還沒匹配到就返回元素本身 // 如果不是立即返回 if (how != NOW) { // No matches available // 新建s節點 if (s == null) s = new Node(e, haveData); // 嘗試入隊 Node pred = tryAppend(s, haveData); // 入隊失敗,重試 if (pred == null) continue retry; // lost race vs opposite mode // 如果不是非同步(同步或者有超時) // 就等待被匹配 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } } private Node tryAppend(Node s, boolean haveData) { // 從tail開始遍歷,把s放到連結串列尾端 for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail // 如果首尾都是null,說明連結串列中還沒有元素 if (p == null && (p = head) == null) { // 就讓首節點指向s // 注意,這裡插入第一個元素的時候tail指標並沒有指向s if (casHead(null, s)) return s; // initialize } else if (p.cannotPrecede(haveData)) // 如果p無法處理,則返回null // 這裡無法處理的意思是,p和s節點的型別不一樣,不允許s入隊 // 比如,其它執行緒先入隊了一個數據節點,這時候要入隊一個非資料節點,就不允許, // 佇列中所有的元素都要保證是同一種類型的節點 // 返回null後外面的方法會重新嘗試匹配重新入隊等 return null; // lost race vs opposite mode else if ((n = p.next) != null) // not last; keep traversing // 如果p的next不為空,說明不是最後一個節點 // 則讓p重新指向最後一個節點 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // 如果CAS更新s為p的next失敗 // 則說明有其它執行緒先一步更新到p的next了 // 就讓p指向p的next,重新嘗試讓s入隊 p = p.next; // re-read on CAS failure else { // 到這裡說明s成功入隊了 // 如果p不等於t,就更新tail指標 // 還記得上面插入第一個元素時tail指標並沒有指向新元素嗎? // 這裡就是用來更新tail指標的 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } // 返回p,即s的前一個元素 return p; } } } private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { // 如果是有超時的,計算其超時時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 當前執行緒 Thread w = Thread.currentThread(); // 自旋次數 int spins = -1; // initialized after first item and cancel checks // 隨機數,隨機讓一些自旋的執行緒讓出CPU ThreadLocalRandom randomYields = null; // bound if needed for (;;) { Object item = s.item; // 如果s元素的值不等於e,說明它被匹配到了 if (item != e) { // matched // assert item != s; // 把s的item更新為s本身 // 並把s中的waiter置為空 s.forgetContents(); // avoid garbage // 返回匹配到的元素 return LinkedTransferQueue.<E>cast(item); } // 如果當前執行緒中斷了,或者有超時的到期了 // 就更新s的元素值指向s本身 if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel // 嘗試解除s與其前一個節點的關係 // 也就是刪除s節點 unsplice(pred, s); // 返回元素的值本身,說明沒匹配到 return e; } // 如果自旋次數小於0,就計算自旋次數 if (spins < 0) { // establish spins at/near front // spinsFor()計算自旋次數 // 如果前面有節點未被匹配就返回0 // 如果前面有節點且正在匹配中就返回一定的次數,等待 if ((spins = spinsFor(pred, s.isData)) > 0) // 初始化隨機數 randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // spin // 還有自旋次數就減1 --spins; // 並隨機讓出CPU if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield } else if (s.waiter == null) { // 更新s的waiter為當前執行緒 s.waiter = w; // request unpark then recheck } else if (timed) { // 如果有超時,計算超時時間,並阻塞一定時間 nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } else { // 不是超時的,直接阻塞,等待被喚醒 // 喚醒後進入下一次迴圈,走第一個if的邏輯就返回匹配的元素了 LockSupport.park(this); } } }
這三個方法裡的內容特別複雜,很大一部分程式碼都是在控制執行緒安全,各種CAS,我們這裡簡單描述一下大致的邏輯:
(1)來了一個元素,我們先檢視佇列頭的節點,是否與這個元素的模式一樣;
(2)如果模式不一樣,就嘗試讓他們匹配,如果頭節點被別的執行緒先匹配走了,就嘗試與頭節點的下一個節點匹配,如此一直往後,直到匹配到或到連結串列尾為止;
(3)如果模式一樣,或者到連結串列尾了,就嘗試入隊;
(4)入隊的時候有可能連結串列尾修改了,那就尾指標後移,再重新嘗試入隊,依此往復;
(5)入隊成功了,就自旋或阻塞,阻塞了就等待被其它執行緒匹配到並喚醒;
(6)喚醒之後進入下一次迴圈就匹配到元素了,返回匹配到的元素;
(7)是否需要入隊及阻塞有四種情況:
a)NOW,立即返回,沒有匹配到立即返回,不做入隊操作
對應的方法有:poll()、tryTransfer(e)
b)ASYNC,非同步,元素入隊但當前執行緒不會阻塞(相當於無界LinkedBlockingQueue的元素入隊)
對應的方法有:add(e)、offer(e)、put(e)、offer(e, timeout, unit)
c)SYNC,同步,元素入隊後當前執行緒阻塞,等待被匹配到
對應的方法有:take()、transfer(e)
d)TIMED,有超時,元素入隊後等待一段時間被匹配,時間到了還沒匹配到就返回元素本身
對應的方法有:poll(timeout, unit)、tryTransfer(e, timeout, unit)
總結
(1)LinkedTransferQueue可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合體;
(2)LinkedTransferQueue的實現方式是使用一種叫做雙重佇列
的資料結構;
(3)不管是取元素還是放元素都會入隊;
(4)先嚐試跟頭節點比較,如果二者模式不一樣,就匹配它們,組成CP,然後返回對方的值;
(5)如果二者模式一樣,就入隊,並自旋或阻塞等待被喚醒;
(6)至於是否入隊及阻塞有四種模式,NOW、ASYNC、SYNC、TIMED;
(7)LinkedTransferQueue全程都沒有使用synchronized、重入鎖等比較重的鎖,基本是通過 自旋+CAS 實現;
(8)對於入隊之後,先自旋一定次數後再呼叫LockSupport.park()或LockSupport.parkNanos阻塞;
彩蛋
LinkedTransferQueue與SynchronousQueue(公平模式)有什麼異同呢?
(1)在java8中兩者的實現方式基本一致,都是使用的雙重佇列;
(2)前者完全實現了後者,但比後者更靈活;
(3)後者不管放元素還是取元素,如果沒有可匹配的元素,所在的執行緒都會阻塞;
(4)前者可以自己控制放元素是否需要阻塞執行緒,比如使用四個新增元素的方法就不會阻塞執行緒,只入隊元素,使用transfer()會阻塞執行緒;
(5)取元素兩者基本一樣,都會阻塞等待有新的元素進入被匹配到;