併發容器學習—SynchronousQueue
public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
上面是SynchronousQueue的兩個構造方法,可以看出SynchronousQueue的底層實際是建立一個轉移佇列,並且這個佇列有公平和非公平兩種模式,下面我們先來看看公平模式的轉移佇列是怎麼實現的。
2.SynchronousQueue的公平模式
不論是非公平的TransferStack佇列還是公平的TransferQueue佇列,都是對父類Transfer的實現:
//Transferer是SynchronousQueue中的一個內部類,定義了一個轉移方法 //TransferStack和TransferQueue都要實現該轉移方法 abstract static class Transferer<E> { abstract E transfer(E e, boolean timed, long nanos); }
TransferQueue的是一個底層由連結串列實現的FIFO佇列,其結點的定義如下:
static final class QNode { volatile QNode next; // 結點的後繼 volatile Object item; // 結點中儲存的資料 volatile Thread waiter; // 等待的執行緒 //是否是資料,此標識要與判斷入隊的操作是什麼操作 //即判斷相鄰兩次入隊的操作是否相同,若是相鄰兩次入隊 //的是不相同的操作(一次put一次take)那麼就要進行配對後移除出隊。 //否則,將操作入隊即可 final boolean isData; //構造方法 QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } //CAS更新後繼結點 boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } //CAS更新結點中的資料 boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } //取消操作,即嘗試將item更新為結點本身 void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } //判斷節點是否應該取消,即item是不是結點本身 boolean isCancelled() { return item == this; } //判斷當前結點是否已不再佇列之中 boolean isOffList() { return next == this; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
知道了TransferQueue的底層實現,在看看TransferQueue中重要的屬性及構造方法:
static final class TransferQueue<E> extends Transferer<E> {
//佇列的隊首結點
transient volatile QNode head;
//佇列的隊尾結點
transient volatile QNode tail;
//清除標記
transient volatile QNode cleanMe;
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long cleanMeOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("cleanMe"));
} catch (Exception e) {
throw new Error(e);
}
}
}
3.公平模式下的take和put操作
//將指定元素新增到此佇列,如有必要則等待另一個執行緒接收它
public void put(E e) throws InterruptedException {
//判斷轉移元素是否為null,也就是說同步佇列中不能對null元素進行轉移
//因為e元素是否為null,是transfer方法中判斷入隊操作是put還是take
//的一個依據
if (e == null) throw new NullPointerException();
//put方法中傳入transfer的e不為null
//呼叫transfer 預設不進行超時等待,如果發生中斷則丟擲中斷異常
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
//獲取並移除此佇列的頭,如有必要則等待另一個執行緒插入它
public E take() throws InterruptedException {
//take方法中傳入transfer方法的e為null
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
由take方法和put方法的原始碼可知,在公平模式下,這兩個方法本質都是對TransferQueue中transfer方法的呼叫,下面來看看TransferQueue中的transfer方法是如何實現的?
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
//根據e是否為null,標記本次入隊的是什麼操作
//e為null,isData為false,否則isData為true
boolean isData = (e != null);
for (;;) { //自旋
QNode t = tail; //獲取隊尾
QNode h = head; //獲取隊首
//判斷隊首或隊尾是否為null,實際上h和t不會為null(構造方法中已經初始化過)
if (t == null || h == null) // saw uninitialized value
continue; // spin
//判斷是否是空佇列,或者入隊操作與隊尾相同(即與隊尾都是put或者take操作)
//不論是空佇列還是與隊尾相同操作,說明只能將操作入隊,而不能進行
//匹配(操作相同,無法配對,配對要操作不同才行)
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next; //獲取隊尾的後繼結點
//判斷隊尾是否已經改變,即是否有其他執行緒搶先進行入隊或者配對操作
if (t != tail) // inconsistent read
continue; //重新進行入隊或者配對判斷
//判斷是不是有新增的與隊尾相同的入隊操作,若是,更新成新的隊尾
if (tn != null) { // lagging tail
advanceTail(t, tn); //嘗試更新隊尾結點,失敗也沒有關係
continue;
}
//判斷是否超時(timed為是否設定了超時等待操作,nanos為剩餘的等待時間)
if (timed && nanos <= 0) // can't wait
return null; //超時後直接返回null
//判斷s是否為null,若是則以e為item建立一個結點
if (s == null)
s = new QNode(e, isData);
//嘗試更新隊尾的後繼結點為s結點,失敗的話就迴圈繼續嘗試直到成功
if (!t.casNext(null, s)) // failed to link in
continue;
//在隊尾新增了一個後繼結點,那麼隊尾就應該是這個後繼結點了
//因此需要將s更新為新的隊尾結點
advanceTail(t, s); // swing tail and wait
//空旋或者阻塞直到匹配的操作到來
Object x = awaitFulfill(s, e, timed, nanos);
//到這一步,說明阻塞的操作已經配對成功或者操作已經被取消了,執行緒被喚醒了
//判斷是配對成功了,還是操作被取消了
//若是操作被取消,會設定s.item=s
if (x == s) { // wait was cancelled
clean(t, s); //清除結點s
return null; //操作已經被取消,直接返回null
}
//判斷結點s是否還在佇列中
//若是還在佇列中,且又配對操作成功,說明s結點應該是新的head
//並且若s.item不為null還需要將s.item設為s自身,等待執行緒賦null
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
//判斷是否
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
// 入隊的操作與之前的隊尾的操作不同,可以進行配對(take配put,或
// put配take)
} else {
//獲取隊首的後繼結點
QNode m = h.next;
//出現t與隊尾不同,m為null,h與隊首不同,說明佇列發生了改變
//即隊列出現了其他執行緒搶先執行了入隊或者配對的操作
if (t != tail || m == null || h != head)
continue; //迴圈重新來
//獲取後繼結點的item
Object x = m.item;
//isData == (x != null)是判斷m結點對應的操作與當前操作是否相同
//x == m 則是判斷m結點是否被取消
//!m.casItem(x, e) 則是判斷嘗試更新m結點的item為e是否成功
//以上三個判斷有一個為真,那就說明m結點已經不在佇列中或是被取消或是匹配過了
if (isData == (x != null) || x == m || !m.casItem(x, e)) {
advanceHead(h, m); //舊隊首已經過時,更新隊首
continue;
}
//更新head,到此處說明配對操作已經成功,應該將m結點變為head
//而h結點則需要移除出列
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter); //喚醒m結點對應的執行緒
//
return (x != null) ? (E)x : e;
}
}
}
//嘗試更新隊尾結點
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
//等待結點被對應的操作匹配
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
//根據timed標識即超時時間計算截止時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread(); //湖區當前執行緒的引用
//計算出自旋時間
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
//死迴圈,確保操作能成功
for (;;) {
//判斷當前執行緒是否被中斷,若是被中斷那麼取消
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item; //獲取結點對應的item物件
//判斷結點的item是否還是物件e
//生成s節點的時候,s.item是等於e的,當取消操作(item變為s)或者
//匹配了操作的時候會進行更改
if (x != e)
return x;
//判斷是否設定了超時等待功能
if (timed) {
nanos = deadline - System.nanoTime(); //計算剩餘的等待時間
if (nanos <= 0L) { //判斷是否應超時,若超時直接嘗試取消操作
s.tryCancel(e);
continue;
}
}
//自旋時間控制
if (spins > 0)
--spins; //自旋時間減少
//判斷是否需要設定等待執行緒
else if (s.waiter == null)
s.waiter = w;
else if (!timed) //沒有設定超時等待功能,直接讓讓執行緒一直掛起
LockSupport.park(this);
//設定了超時等待功能,就判斷等待時間是否大於自旋的最大時間
//若大於自旋最大時間那就讓執行緒阻塞一段時間
//否則讓執行緒自旋一段時間
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
//嘗試取消當前結點對應的操作,即將結點中item值更新成結點自身(this)
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
//更新隊首head指標,並將原隊首結點的next指向其自身,方便GC
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
//若是某個結點對應的操作被取消,那麼這個操作對應的結點需要移除出隊
//也就是清理無效結點。
/**
* clean方法中如果刪除的節點不是尾節點,那麼可以直接進行刪除,
* 如果刪除的節點是尾節點,那麼用cleanMe標記需要刪除的節點的前驅,
* 這樣在下一輪的clean的過程將會清除打了標記的節點。
*/
void clean(QNode pred, QNode s) {
s.waiter = null; //操作已經被取消,那麼就不會有等待執行緒了
//判斷s是否是pred的後繼結點
while (pred.next == s) { // Return early if already unlinked
QNode h = head; //隊首引用
QNode hn = h.next; //隊首的後繼結點
//判斷hn是否被取消,若被取消,那麼更新head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // 隊尾結點
//判斷是否是空佇列
if (t == h)
return;
QNode tn = t.next; //隊尾的後繼
//隊尾改變,說明佇列被其他執行緒修改過了
if (t != tail)
continue;
//t有後繼結點,說明隊尾該更新了
if (tn != null) {
advanceTail(t, tn);
continue;
}
//判斷s結點是否是隊尾結點
//若不是隊尾結點,則只需將s的前驅pred結點的next指向s結點的後繼結點sn
//即成功將s結點從佇列中清除
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
//到此處,說明要清除的結點就是佇列的隊尾,而隊尾不能直接刪除
QNode dp = cleanMe; //獲取標識有要刪除的結點的前驅結點
//判斷是否有需要刪除的結點,dp不為null,即cleanMe存在
//說明佇列之前有隊尾結點需要刪除
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
//當cleanMe處於以下四種情形時,cleanMe失效
//(1)cleanMe的後繼而空(cleanMe 標記的是需要刪除節點的前驅)
//(2)cleanMe的後繼等於自身,
//(3)需要刪除節點的操作沒有被取消,
//(4)被刪除的節點不是尾節點且其後繼節點有效
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null); //cleanMe失效,賦值為null
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred)) //將隊尾的前驅結點標記成cleanMe
return; // Postpone cleaning s
}
}
//用於判斷結點是否還在佇列中,若後繼結點是自身,說明已經不再佇列中
//否則還在佇列中
boolean isOffList() {
return next == this;
}
由上面對公平的SynchronousQueue的分析可知,底層是使用佇列來實現公平模式的,並且執行緒安全是通過CAS方式實現的。公平的TransferQueue佇列中會將連續相同的操作入隊,而不同的操作則會進行配對,即TransferQueue佇列中要麼沒有存放操作,要麼存放都是相同的操作(要麼都是take,要麼都是put),當有一個與佇列中的操作不相同的操作時,佇列會自動將隊首操作與之進行匹配。大致流程(操作被取消未顯示,方便理解)如下圖所示:
4.SynchronousQueue的非公平模式
SynchronousQueue的非公平模式是基於棧來實現的,我們知道棧是後進先出的(LIFO),也就是說這裡的非公平模式與ReentrantLock中的非公平模式區別巨大,後來先服務這太不公平了。
先來看看非公平模式的具體實現TransferStack的底層資料結構連結串列中結點的定義:
static final class SNode {
volatile SNode next; // 連結串列的後繼結點
volatile SNode match; // 匹配的結點
volatile Thread waiter; // 等待的執行緒
Object item; // 資料
int mode; //結點的模式
SNode(Object item) {
this.item = item;
}
//CAS方式更新後繼結點
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//嘗試匹配結點,匹配成功就將等待匹配結點對應的執行緒喚醒繼續後續操作
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
//嘗試取消結點,將match更新成結點自身即標誌著結點已處於取消狀態
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
//判斷是否被取消
boolean isCancelled() {
return match == this;
}
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
知道了結點的基本結構,在來看看TransferStack中的重要屬性及構造方法:
//TransferStack中沒有構造方法,因此只有一個空構造
/**
* TransferStack中定義了三個標記:REQUEST表示消費者,DATA表示生產者,
* FULFILLING表示操作匹配狀態。任何執行緒對TransferStack的操作都屬於
* 上述3種狀態中的一種
*/
static final class TransferStack<E> extends Transferer<E> {
//用於標記結點的型別,代表結點是消費者(對應take操作)
static final int REQUEST = 0;
//用於標記結點的型別,代表結點是生產者(對應put操作)
static final int DATA = 1;
//用於標記結點的狀態,代表結點正處於匹配狀態
static final int FULFILLING = 2;
//棧頂結點
volatile SNode head;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferStack.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
} catch (Exception e) {
throw new Error(e);
}
}
}
5.非公平模式下的take和put操作
有上文對公平模式的學習,我們知道take和put操作最終呼叫的都是transfer方法,只不過公平模式呼叫的是TransferQueue中的轉移方法,非公平模式則是呼叫TransferStack中的轉移方法
/**
* transfer的大致過程:將一個操作與棧頂的操作進行配對
* 若是配對不成功(take對應put,或put對應take),那麼直接將該操作
* 入棧;若是配對成功,即此時應該將棧頂操作出棧,但是不能直接出棧(
* 若此時其他執行緒進行入棧,那麼直接出棧會出問題),而是先將匹配的操作
* 標記成FULFILLING狀態(匹配狀態)然後入棧;當其他執行緒檢查到這個匹配
* 的過程,就會先幫助配對,在去執行自身的操作
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
//判斷當前操作是何種模式,REQUEST對應take,DATA對應put
int mode = (e == null) ? REQUEST : DATA;
for (;;) { //死迴圈
SNode h = head; //獲取棧頂
//判斷棧是否為空棧,若不為空棧,那麼棧頂操作模式與當前操作模式是否相同
//若是棧頂操作與當前操作模式相同,那麼就需要入棧該操作
if (h == null || h.mode == mode) { // empty or same-mode
//判斷是否設定了超時等待,若設定了超時等待,那等待時間是否還有剩餘
if (timed && nanos <= 0) {
//判斷棧頂是否為null,且棧頂是否被取消
//若不為null,且棧頂操作被取消,那麼就嘗試更新棧頂操作為其後繼
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else //棧頂為null或者沒被取消
return null;
//沒有設定超時等待,或超時等待時間還未到,則嘗試將當前操作入棧
} else if (casHead(h, s = snode(s, e, h, mode))) {
//s入棧成功,那麼就自旋或掛起等待匹配的操作到來在被喚醒
SNode m = awaitFulfill(s, timed, nanos); //操作自旋或掛起
//到這一步說明操作已經被取消,或者匹配到了對應的操作
//匹配結點m為結點自己本身,說明操作被取消
if (m == s) { // wait was cancelled
clean(s); //清除被取消的結點
return null;
}
//若是棧不是空棧且棧頂的後繼是s,說明操作s和其匹配的操作m,都還在棧中
//需要將其移除出棧,即更新棧頂為s的後繼
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
//根據操作型別返回對應的資料,最後返回的其實都是put進去的資料
return (E) ((mode == REQUEST) ? m.item : s.item);
}
//當前的操作與棧頂操作相匹配,進行匹配,將操作的狀態更新成FULFILLING併入棧
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // 判斷棧頂是否被取消
casHead(h, h.next); // 棧頂被取消更新棧頂
//入棧新結點s,s的後繼為h,並且s處於FULFILLING狀態(匹配狀態)
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // s的後繼m,因為s處於匹配狀態,m可能是其配對的結點
//配對結點為null
//這裡有些疑問,沒搞懂什麼情形下會出現m為null(按理不應該出現的)
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
//m的後繼結點
SNode mn = m.next;
//判斷m和s配對是否成功,配對成功嘗試更新棧頂為m的後繼
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
//沒有匹配成功,說明m結點不是s的配對結點,繼續向後尋找
s.casNext(m, mn); // help unlink
}
}
//到這說明棧頂處於匹配狀態,那麼幫助其匹配
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
//將結點對應的執行緒自旋或掛起以等待匹配的操作到來
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
//計算截止時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//計算自旋時間
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(); //嘗試取消操作
SNode m = s.match; //獲取匹配的結點
//匹配結點不為null,說明要麼匹配到了,要麼被取消,都可以結束掛起了
if (m != null)
return m;
//判斷是否設定了超時等待
if (timed) {
//計算剩餘時間
nanos = deadline - System.nanoTime();
//設定了超時等待,若截止時間已到,那麼操作要被取消
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
//自旋時間遞減
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
//設定等待執行緒
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
//沒有設定超時等待,執行緒一直掛起,知道被配對操作喚醒
else if (!timed)
LockSupport.park(this); //執行緒掛起
//判斷超時等待時間是否大於自旋最大時間
//若是大於,就直接將執行緒掛起一段時間
//否則只自旋不掛起
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
//需要自旋的情形
//當前結點為棧頂
//棧頂為null
//棧頂正在匹配中
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
//嘗試配對
boolean tryMatch(SNode s) {
//嘗試將當前結點的配對結點更新為s,更新成功就喚醒當前結對應的執行緒
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
通過上面對非公平模式下TransferStack中transfer方法的分析,可知非公平模式實際上可以說是非常的不公平,因為TransferStack是利用棧的後進先出性質來進行配對的,也就說基本上都是後來先服務。其大致過程可以簡化成如下圖所示: