圖解SynchronousQueue原理-公平模式
SynchronousQueue原理詳解-公平模式
一、介紹
SynchronousQueue是一個雙棧雙佇列演算法,無空間的佇列或棧,任何一個對SynchronousQueue寫需要等到一個對SynchronousQueue的讀操作,反之亦然。一個讀操作需要等待一個寫操作,相當於是交換通道,提供者和消費者是需要組隊完成工作,缺少一個將會阻塞執行緒,知道等到配對為止。
SynchronousQueue是一個佇列和棧演算法實現,在SynchronousQueue中雙佇列FIFO提供公平模式,而雙棧LIFO提供的則是非公平模式。
對於SynchronousQueue來說,他的put方法和take方法都被抽象成統一方法來進行操作,通過抽象出內部類Transferer,來實現不同的操作。
注意事項:本文分析主要是針對jdk1.8的版本進行分析,下面的程式碼中的執行緒執行順序可能並不能完全保證順序性,執行時間比較短,所以暫且認定有序執行。
約定:圖片中以Reference-開頭的代表物件的引用地址,通過箭頭方式進行引用物件。
Transferer.transfer方法主要介紹如下所示:
abstract static class Transferer<E> { /** * 執行put和take方法. * * @param e 非空時,表示這個元素要傳遞給消費者(提供者-put); * 為空時, 則表示當前操作要請求消費一個數據(消費者-take)。 * offered by producer. * @param timed 決定是否存在timeout時間。 * @param nanos 超時時長。 * @return 如果返回非空, 代表資料已經被消費或者正常提供; 如果為空, * 則表示由於超時或中斷導致失敗。可通過Thread.interrupted來檢查是那種。 */ abstract E transfer(E e, boolean timed, long nanos); }
接下來看一下SynchronousQueue的欄位資訊:
/** CPU數量 */ static final int NCPUS = Runtime.getRuntime().availableProcessors(); /** * 自旋次數,如果transfer指定了timeout時間,則使用maxTimeSpins,如果CPU數量小於2則自旋次數為0,否則為32 * 此值為經驗值,不隨CPU數量增加而變化,這裡只是個常量。 */ static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; /** * 自旋次數,如果沒有指定時間設定,則使用maxUntimedSpins。如果NCPUS數量大於等於2則設定為為32*16,否則為0; */ static final int maxUntimedSpins = maxTimedSpins * 16; /** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices. */ static final long spinForTimeoutThreshold = 1000L;
- NCPUS:代表CPU的數量
- maxTimedSpins:自旋次數,如果transfer指定了timeout時間,則使用maxTimeSpins,如果CPU數量小於2則自旋次數為0,否則為32,此值為經驗值,不隨CPU數量增加而變化,這裡只是個常量。
- maxUntimedSpins:自旋次數,如果沒有指定時間設定,則使用maxUntimedSpins。如果NCPUS數量大於等於2則設定為為32*16,否則為0;
- spinForTimeoutThreshold:為了防止自定義的時間限過長,而設定的,如果設定的時間限長於這個值則取這個spinForTimeoutThreshold 為時間限。這是為了優化而考慮的。這個的單位為納秒。
公平模式-TransferQueue
TransferQueue內部是如何進行工作的,這裡先大致講解下,佇列採用了互補模式進行等待,QNode中有一個欄位是isData,如果模式相同或空佇列時進行等待操作,互補的情況下就進行消費操作。
入隊操作相同模式
不同模式時進行出佇列操作:
這時候來了一個isData=false的互補模式,佇列就會變成如下狀態:
TransferQueue繼承自Transferer抽象類,並且實現了transfer方法,它主要包含以下內容:
QNode
代表隊列中的節點元素,它內部包含以下欄位資訊:
- 欄位資訊描述
欄位 | 描述 | 型別 |
---|---|---|
next | 下一個節點 | QNode |
item | 元素資訊 | Object |
waiter | 當前等待的執行緒 | Thread |
isData | 是否是資料 | boolean |
- 方法資訊描述
方法 | 描述 |
---|---|
casNext | 替換當前節點的next節點 |
casItem | 替換當前節點的item資料 |
tryCancel | 取消當前操作,將當前item賦值為this(當前QNode節點) |
isCancelled | 如果item是this(當前QNode節點)的話就返回true,反之返回false |
isOffList | 如果已知此節點離佇列,判斷next節點是不是為this,則返回true,因為由於* advanceHead操作而忘記了其下一個指標。 |
E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
// 分為兩種狀態1.有資料=true 2.無資料=false
boolean isData = (e != null);
// 迴圈內容
for (;;) {
// 尾部節點。
QNode t = tail;
// 頭部節點。
QNode h = head;
// 判斷頭部和尾部如果有一個為null則自旋轉。
if (t == null || h == null) // 還未進行初始化的值。
continue; // 自旋
// 頭結點和尾節點相同或者尾節點的模式和當前節點模式相同。
if (h == t || t.isData == isData) { // 空或同模式。
// tn為尾節點的下一個節點資訊。
QNode tn = t.next;
// 這裡我認為是閱讀不一致,原因是當前執行緒還沒有阻塞的時候其他執行緒已經修改了尾節點tail會導致當前執行緒的tail節點不一致。
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 這裡如果指定timed判斷時間小於等於0直接返回。
return null;
// 判斷新增節點是否為null,為null直接構建新節點。
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // 如果next節點不為null說明已經有其他執行緒進行tail操作
continue;
// 將t節點替換為s節點
advanceTail(t, s);
// 等待有消費者消費執行緒。
Object x = awaitFulfill(s, e, timed, nanos);
// 如果返回的x,指的是s.item,如果s.item指向自己的話清除操作。
if (x == s) {
clean(t, s);
return null;
}
// 如果沒有取消聯絡
if (!s.isOffList()) {
// 將當前節點替換頭結點
advanceHead(t, s); // unlink if head
if (x != null) // 取消item值,這裡是take方法時會進行item賦值為this
s.item = s;
// 將等待執行緒設定為null
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
// 獲取頭結點下一個節點
QNode m = h.next; // node to fulfill
// 如果當前執行緒尾節點和全域性尾節點不一致,重新開始
// 頭結點的next節點為空,代表無下一個節點,則重新開始,
// 當前執行緒頭結點和全域性頭結點不相等,則重新開始
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
我們來看一下awaitFulfill方法內容:
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 如果指定了timed則為System.nanoTime() + nanos,反之為0。
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 獲取當前執行緒。
Thread w = Thread.currentThread();
// 如果頭節點下一個節點是當前s節點(以防止其他執行緒已經修改了head節點)
// 則運算(timed ? maxTimedSpins : maxUntimedSpins),否則直接返回。
// 指定了timed則使用maxTimedSpins,反之使用maxUntimedSpins
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 自旋
for (;;) {
// 判斷是否已經被中斷。
if (w.isInterrupted())
//嘗試取消,將當前節點的item修改為當前節點(this)。
s.tryCancel(e);
// 獲取當前節點內容。
Object x = s.item;
// 判斷當前值和節點值不相同是返回,因為彈出時會將item值賦值為null。
if (x != e)
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)![](https://img2018.cnblogs.com/blog/458325/201905/458325-20190511194850882-1013581623.png)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
- 首先先判斷有沒有被中斷,如果被中斷則取消本次操作,將當前節點的item內容賦值為當前節點。
- 判斷當前節點和節點值不相同是返回
- 將當前執行緒賦值給當前節點
- 自旋,如果指定了timed則使用
LockSupport.parkNanos(this, nanos);
,如果沒有指定則使用LockSupport.park(this);
。 - 中斷相應是在下次才能被執行。
通過上面原始碼分析我們這裡做出簡單的示例程式碼演示一下put操作和take操作是如何進行運作的,首先看一下示例程式碼,如下所示:
/**
* SynchronousQueue進行put和take操作。
*
* @author battleheart
*/
public class SynchronousQueueDemo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
Thread thread1 = new Thread(() -> {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
Thread.sleep(2000);
Thread thread2 = new Thread(() -> {
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread2.start();
Thread.sleep(10000);
Thread thread3 = new Thread(() -> {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread3.start();
}
}
首先上來之後進行的是兩次put操作,然後再take操作,預設佇列上來會進行初始化,初始化的內容如下程式碼所示:
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
初始化後佇列的狀態如下圖所示:
當執行緒1執行put操作時,來分析下程式碼:
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue;
首先執行區域性變數t代表隊尾指標,h代表隊頭指標,判斷隊頭和隊尾不為空則進行下面的操作,接下來是if…else語句這裡是分水嶺,當相同模式操作的時候執行if語句,當進行不同模式操作時執行的是else語句,程式是如何控制這樣的操作的呢?接下來我們慢慢分析一下:
if (h == t || t.isData == isData) { // 佇列為空或者模式相同時進行if語句
QNode tn = t.next;
if (t != tail) // 判斷t是否是隊尾,不是則重新迴圈。
continue;
if (tn != null) { // tn是隊尾的下個節點,如果tn有內容則將隊尾更換為tn,並且重新迴圈操作。
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 如果指定了timed並且延時時間用盡則直接返回空,這裡操作主要是offer操作時,因為佇列無儲存空間的當offer時不允許插入。
return null;
if (s == null) // 這裡是新節點生成。
s = new QNode(e, isData);
if (!t.casNext(null, s)) // 將尾節點的next節點修改為當前節點。
continue;
advanceTail(t, s); // 隊尾移動
Object x = awaitFulfill(s, e, timed, nanos); //自旋並且設定執行緒。
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
}
上面程式碼是if語句中的內容,進入到if語句中的判斷是如果頭結點和尾節點相等代表隊列為空,並沒有元素所有要進行插入佇列的操作,或者是隊尾的節點的isData標誌和當前操作的節點的型別一樣時,會進行入隊操作,isData標識當前元素是否是資料,如果為true代表是資料,如果為false則代表不是資料,換句話說只有模式相同的時候才會往佇列中存放,如果不是模式相同的時候則代表互補模式,就不走if語句了,而是走了else語句,上面程式碼中做有註釋講解,下面看一下這裡:
if (s == null) // 這裡是新節點生成。
s = new QNode(e, isData);
if (!t.casNext(null, s)) // 將尾節點的next節點修改為當前節點。
continue
當執行上面程式碼後,佇列的情況如下圖所示:(這裡視為插入第一個元素
圖,方便下面的引用)
接下來執行這段程式碼:
advanceTail(t, s); // 隊尾移動
修改了tail節點後,這時候就需要進行自旋操作,並且設定QNode的waiter等待執行緒,並且將執行緒等待,等到喚醒執行緒進行喚醒操作
Object x = awaitFulfill(s, e, timed, nanos); //自旋並且設定執行緒。
方法內部分析區域性內容,上面已經全部內容的分析:
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
如果自旋時間spins還有則進行迴圈遞減操作,接下來判斷如果當前節點的waiter是空則價格當前執行緒賦值給waiter,上圖中顯然是為空的所以會把當前執行緒進行賦值給我waiter,接下來就是等待操作了。
上面執行緒則處於等待狀態,接下來是執行緒二進行操作,這裡不進行重複進行,插入第二個元素佇列的狀況,此時執行緒二也處於等待狀態。
上面的主要是put了兩次操作後佇列的情況,接下來分析一下take操作時又是如何進行操作的,當take操作時,isData為false,而隊尾的isData為true兩個不相等,所以不會進入到if語句,而是進入到了else語句
} else { // 互補模式
QNode m = h.next; // 獲取頭結點的下一個節點,進行互補操作。
if (t != tail || m == null || h != head)
continue; // 這裡就是為了防止閱讀不一致的問題
Object x = m.item;
if (isData == (x != null) || // 如果x=null說明已經被讀取了。
x == m || // x節點和m節點相等說明被中斷操作,被取消操作了。
!m.casItem(x, e)) { // 這裡是將item值設定為null
advanceHead(h, m); // 移動頭結點到頭結點的下一個節點
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
首先獲取頭結點的下一個節點用於互補操作,也就是take操作,接下來進行閱讀不一致的判斷,防止其他執行緒進行了閱讀操作,接下來獲取需要彈出內容x=1,首先進行判斷節點內容是不是已經被消費了,節點內容為null時則代表被消費了,接下來判斷節點的item值是不是和本身相等如果相等話說明節點被取消了或者被中斷了,然後移動頭結點到下一個節點上,然後將refenrence-715
的item值修改為null,至於為什麼修改為null這裡留下一個懸念,這裡還是比較重要的,大家看到這裡的時候需要注意下
,顯然這些都不會成立,所以if語句中內容不會被執行,接下來的佇列的狀態是是這個樣子的:
OK,接下來就開始移動隊頭head了,將head移動到m節點上,執行程式碼如下所示:
advanceHead(h, m);
此時佇列的狀態是這個樣子的:
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
接下來將執行喚醒被等待的執行緒,也就是thread-0,然後返回獲取item值1,take方法結束,但是這裡並沒有結束,因為喚醒了put的執行緒,此時會切換到put方法中,這時候執行緒喚醒後會執行awaitFulfill
方法,此時迴圈時,有與item值修改為null則直接返回內容。
Object x = s.item;
if (x != e)
return x;
這裡的程式碼我們可以對照插入第一個元素
圖,s節點也就是當前m節點,獲取值得時候已經修改為null,但是當時插入的值時1,所以兩個不想等了,則直接返回null值。
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
又返回到了transfer方法的if語句中,此時x和s並不相等所以不用進行clean操作,首先判斷s節點是否已經離隊了,顯然並沒有進行離隊操作,advanceHead(t, s);
操作不會被執行因為上面已近將頭節點修改了,但是第一次插入的時候頭結點還是reference-716
,此時已經是reference-715
,而t節點的引用地址是reference-716
,所以不會操作,接下來就是將waiter設定為null,也就是忘記掉等待的執行緒。
分析了正常的take和put操作,接下來分析下中斷操作,由於中斷相應後,會被執行if(w.isInterrupted())
這段程式碼,它會執行s.tryCancel(e)
方法,這個方法的作用的是將QNode節點的item節點賦值為當前QNode,這時候x和e值就不相等了(if (x != e)
),x的值是s.item,則為當前QNode,而e的值是使用者指定的值,這時候返回x(s.item)。返回到函式呼叫地方transfer
中,這時候要執行下面語句:
if (x == s) {
clean(t, s);
return null;
}
進入到clean方法執行清理當前節點,下面是方法clean程式碼:
/**
* Gets rid of cancelled node s with original predecessor pred.
*/
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
* first. At least one of node s or the node previously
* saved can always be deleted, so this always terminates.
*/
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next;
// 判斷現在的t是不是末尾節點,可能其他執行緒插入了內容導致不是最後的節點。
if (t != tail)
continue;
// 如果不是最後節點的話將其現在t.next節點作為tail尾節點。
if (tn != null) {
advanceTail(t, tn);
continue;
}
// 如果當前節點不是尾節點進入到這裡面。
if (s != t) { // If not tail, try to unsplice
// 獲取當前節點(被取消的節點)的下一個節點。
QNode sn = s.next;
// 修改上一個節點的next(下一個)元素為下下個節點。
if (sn == s || pred.casNext(s, sn))
//返回。
return;
}
QNode dp = cleanMe;
if (dp != null) { // 嘗試清除上一個標記為清除的節點。
QNode d = dp.next; //1.獲取要被清除的節點
QNode dn;
if (d == null || // 被清除節點不為空
d == dp || // 被清除節點已經離隊
!d.isCancelled() || // 被清除節點是標記為Cancel狀態的。
(d != t && // 被清除節點不是尾節點
(dn = d.next) != null && // 被清除節點下一個節點不為null
dn != d && // that is on list
dp.casNext(d, dn))) // 將被清除的節點的前一個節點的下一個節點修改為被清除節點的下一個節點。
casCleanMe(dp, null); // 清空cleanMe節點。
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred)) // 這裡將上一個節點標記為被清除操作,但是其實要操作的是下一個節點。
return; // Postpone cleaning s
}
}
- 如果節點中取消的頭結點的下一個節點,只需要移動當前head節點到下一個節點即可。
- 如果取消的是中間的節點,則將當前節點next節點修改為下下個節點。
- 如果修改為末尾的節點,則將當前節點放入到QNode的clearMe中,等待有內容進來之後下一次進行清除操作。
例項一:清除頭結點下一個節點,下面是例項程式碼進行講解:
/**
* 清除頭結點的下一個節點例項程式碼。
*
* @author battleheart
*/
public class SynchronousQueueDemo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
AtomicInteger atomicInteger = new AtomicInteger(0);
Thread thread1 = new Thread(() -> {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
Thread.sleep(200);
Thread thread2 = new Thread(() -> {
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread2.start();
Thread.sleep(2000);
thread1.interrupt();
}
}
上面例子說明我們啟動了兩個執行緒,分別向SynchronousQueue佇列中添加了元素1和元素2,新增成功之後的,讓主執行緒休眠一會,然後將第一個執行緒進行中斷操作,新增兩個元素後節點所處在的狀態為下圖所示:
當我們呼叫thread1.interrupt
時,此時執行緒1等待的消費操作將被終止,會相應上面awaitFulfill
方法,該方法會執行下面程式碼:
if (w.isInterrupted())
//嘗試取消,將當前節點的item修改為當前節點(this)。
s.tryCancel(e);
// 獲取當前節點內容。
Object x = s.item;
// 判斷當前值和節點值不相同是返回,因為彈出時會將item值賦值為null。
if (x != e)
return x;
首先上來現將s節點(上圖中的Reference-715引用物件)的item節點設定為當前節點引用(Reference-715引用物件),所以s節點和e=1不相等則直接返回,此時節點的狀態變化如下所示:
退出awaitFulfill
並且返回的是s節點內容(實際上返回的就是s節點),接下來返回到呼叫awaitFulfill
的方法transfer
方法中
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // 是否是被取消了
clean(t, s);
return null;
}
首先判斷的事x節點和s節點是否相等,上面我們也說了明顯是相等的所以這裡會進入到clean方法中,clean(QNode pred, QNode s)
clean方法一個是前節點,一個是當前被取消的節點,也就是當前s節點的前節點是head節點,接下來我們一步一步的分析程式碼:
s.waiter = null; // 刪除等待的執行緒。
進入到方法體之後首先先進行的是將當前節點的等待執行緒刪除,如下圖所示:
接下來進入while迴圈,迴圈內容時pred.next == s
如果不是則表示已經移除了節點,反之還在佇列中,則進行下面的操作:
QNode h = head;
QNode hn = h.next; // 如果取消的是第一個節點則進入下面語句
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
可以看到首先h節點為head節點,hn為頭結點的下一個節點,在進行判斷頭結點的下一個節點不為空並且頭結點下一個節點是被中斷的節點(取消的節點),則進入到if語句中,if語句其實也很簡單就是將頭結點修改為頭結點的下一個節點(s節點,別取消節點,並且將前節點的next節點修改為自己,也就是移除了之前的節點,我們看下advanceHead方法:
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
首先上來先進行CAS移動頭結點,再講原來頭結點h的next節點修改為自己(h),為什麼這樣做呢?因為上面進行advanceHead
之後並沒有退出迴圈,是進行continue操作,也就是它並沒有跳出while迴圈,他還會迴圈一次prev.next此時已經不能等於s所以退出迴圈,如下圖所示:
例項二:清除中間的節點
/**
* SynchronousQueue例項二,清除中間的節點。
*
* @author battleheart
*/
public class SynchronousQueueDemo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
AtomicInteger atomicInteger = new AtomicInteger(0);
Thread thread1 = new Thread(() -> {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
//休眠一會。
Thread.sleep(200);
Thread thread2 = new Thread(() -> {
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread2.start();
//休眠一會。
Thread.sleep(200);
Thread thread3 = new Thread(() -> {
try {
queue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread3.start();
//休眠一會。
Thread.sleep(10000);
thread2.interrupt();
}
}
看上面例子,首先先進行put操作三次,也就是入隊3條資料,分別是整型值1,整型值2,整型值3,然後將當前執行緒休眠一下,對中間執行緒進行中斷操作,通過讓主執行緒休眠一會保證執行緒執行順序性(當然上面執行緒不一定能保證執行順序,因為put操作一下子就執行完了所以這點時間是可以的),此時佇列所處的狀態來看一下下圖:
當休眠一會之後,進入到threa2進行中斷操作,目前上圖中表示Reference-723
被中斷操作,此時也會進入到awaitFulfill
方法中,將Reference-723
的item節點修改為當前節點,如下圖所示:
進入到clear方法中此時的prev節點為Reference-715
,s節點是被清除節點,還是首先進入clear方法中先將waiter設定為null,取消當前執行緒內容,如下圖所示:
接下來進入到迴圈中,進行下面處理
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
第一個if語句已經分析過了所以說這裡不會進入到裡面去,接下來是進行尾節點t是否是等於head節點如果相等則代表沒有元素,在判斷當前方法的t尾節點是不是真正的尾節點tail如果不是則進行修改尾節點,先來看一下現在的狀態:
tn != null
判斷如果tn不是尾節點,則將tn作為尾節點處理,如果處理之後還不是尾節點還會進行處理直到tail是尾節點未知,我們現在這個是尾節點所以跳過這段程式碼。s != t
通過上圖可以看到s節點是被清除節點,並不是尾節點所以進入到迴圈中:
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
首先獲取的s節點的下一個節點,上圖中表示Reference-725
節點,判斷sn是都等於當前節點顯然這一條不成立,pred節點為Reference-715
節點,將715節點的next節點變成Reference-725
節點,這裡就將原來的節點清理出去了,現在的狀態如下所示:
例項三:刪除的節點是尾節點
/**
* SynchronousQueue例項三,刪除的節點為尾節點
*
* @author battleheart
*/
public class SynchronousQueueDemo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
AtomicInteger atomicInteger = new AtomicInteger(0);
Thread thread1 = new Thread(() -> {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
Thread thread2 = new Thread(() -> {
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread2.start();
Thread.sleep(10000);
thread2.interrupt();
Thread.sleep(10000);
Thread thread3 = new Thread(() -> {
try {
queue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread3.start();
Thread.sleep(10000);
thread3.interrupt();
}
}
該例子主要說明一個問題就是刪除的節點如果是末尾節點的話,clear
方法又是如何處理的,首先啟動了三個執行緒其中主執行緒休眠了一會,為了能讓插入的順序保持執行緒1,執行緒2,執行緒3這樣子,啟動第二個執行緒後,又將第二個執行緒中斷,這是第二個執行緒插入的節點為尾節點,然後再啟動第三個節點插入值,再中斷了第三個節點末尾節點,說一下為啥這樣操作,因為當清除尾節點時,並不是直接移除當前節點,而是將被清除的節點的前節點設定到QNode的CleanMe中,等待下次clear方法時進行清除上次儲存在CleanMe的節點,然後再處理當前被中斷節點,將新的被清理的節點prev設定為cleanMe當中,等待下次進行處理,接下來一步一步分析,首先我們先來看一下第二個執行緒啟動後節點的狀態。
此時執行thread2.interrupt();
將第二個執行緒中斷,這時候會進入到clear方法中,前面的程式碼都不會被返回,會執行下面的語句:
QNode dp = cleanMe;
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))
return;
首先獲得TransferQueue當中cleanMe節點,此時獲取的為null,當判斷dp!=null時就會被跳過,直接執行
casCleanMe(null, pred)
此時pred傳入的值時t節點指向的內容,也就是當前節點的上一個節點,它會被標記為清除操作節點(其實並不清楚它而是清除它下一個節點,也就是說item=this的節點),此時看一下節點狀態為下圖所示:
接下來第三個執行緒啟動了這時候又往佇列中添加了元素3,此時佇列的狀況如下圖所示:
此時thread3也被中斷操作了,這時候還是執行上面的程式碼,但是這次不同的點在於cleanMe已經不是空值,是有內容的,首先獲取的是cleanMe的下一個節點(d),然我來把變數標記在圖上然後看起來好分析一些,如下圖所示:
dp表示d節點的前一個pred節點,dn表示d節點的next節點,主要邏輯在這裡:
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s
首先判斷d節點是不是為null,如果d節點為null代表已經清除掉了,如果cleanMe節點的下一個節點和自己相等,說明需要清除的節點已經離隊了,判斷下個節點是不是需要被清除的節點,目前看d節點是被清除的節點,然後就將被清除的節點的下一個節點賦值給dn並且判斷d節點是不是末尾節點,如果不是末尾節點則進行dp.casNext
方法,這個地方是關鍵點,它將被清除節點d的前節點的next節點修改為被清除節點d的後面節點dn,然後呼叫caseCleanMe將TransferQueue中的cleanMe節點清空,此時節點的內容如下所示:
可以看出將上一次標記為清除的節點清除了佇列中,清除完了就完事兒?那這次的怎麼弄呢?因為現在執行的是thread3的中斷程式,所以上面並沒有退出,而是再次進入迴圈,迴圈之後發現dp為null則會執行casCleanMe(null, pred)
,此時當前節點s的前一個節點已經被清除佇列,但是並不影響後續的清除操作,因為前節點的next節點還在維護中,也是前節點的next指向還是reference-725
,如下圖所示:
就此分析完畢如果有不正確的地方請指正