AQS同步佇列和條件佇列
阿新 • • 發佈:2021-09-28
static final class Node { //共享模式,資源可以同時去拿 static final Node SHARED = new Node(); //獨佔模式,只能有一個執行緒去拿 static final Node EXCLUSIVE = null; //表示當前執行緒被中斷了,在佇列中沒有任何意義,可以被剔除了 static final int CANCELLED = 1; /** * 後繼節點的執行緒處於等待狀態,而當前節點如果釋放了同步狀態或者被取消, * 將會通知後繼節點,使後繼節點得以執行 */ static final int SIGNAL = -1; /** * 節點在等待佇列中,節點的執行緒等待在Condition上,當其他執行緒對Condition呼叫了signal()方法後, * 該節點會從等待佇列中轉移到同步佇列中,加入到同步狀態的獲取中 */ static final int CONDITION = -2; /** * 表示下一次共享方式同步狀態獲取將會被無條件的傳播下去 */ static final int PROPAGATE = -3; /** * 標記當前節點的訊號量狀態(1,0,-1,-2,-3)5種狀態 * 使用CAS更改狀態,volatile保證執行緒可見性,併發場景下, * 即被一個執行緒修改後,狀態會立馬讓其他執行緒可見 */ volatile int waitStatus; /** * 前驅節點,當前節點加入到同步佇列中被設定 */ volatile Node prev; /** * 後繼節點 */ volatile Node next; /** * 節點同步狀態的執行緒 */ volatile Thread thread; /** * 等待佇列中的後繼節點,如果當前節點是共享的,那麼這個欄位是一個SHARED常量 * 也就是說節點型別(獨佔和共享)和等待佇列中的後繼節點公用一個欄位 * (用在條件佇列裡面) */ Node nextWaiter; }
CLH同步佇列
CLH 同步佇列是一個 FIFO 雙向佇列,AQS 依賴它來完成同步狀態的管理:
- 當前執行緒如果獲取同步狀態失敗時,AQS則會將當前執行緒已經等待狀態等資訊構造成一個節點(Node)並將其加入到CLH同步佇列,同時會阻塞當前執行緒
- 當同步狀態釋放時,會把首節點喚醒,使其再次嘗試獲取同步狀態。
state為0,表示可以競爭鎖。
state為1,表示無鎖。可重入鎖state可以++。
1、執行緒一和執行緒二cas競爭
2、執行緒二競爭失敗,放入同步佇列。呼叫locksupport.park阻塞。
3、執行緒一執行成功釋放鎖,state置為0,喚醒執行緒二,重複1步驟。
入隊操作
通過“自旋”也就是死迴圈的方式來保證該節點能順利的加入到佇列尾部,只有加入成功才會退出迴圈,否則會一直循序直到成功。
private Node addWaiter(Node mode) { // 以給定的模式來構建節點, mode有兩種模式 // 共享式SHARED, 獨佔式EXCLUSIVE; Node node = new Node(Thread.currentThread(), mode); // 嘗試快速將該節點加入到佇列的尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果快速加入失敗,則通過 anq方式入列 enq(node); return node; } private Node enq(final Node node) { // CAS自旋,直到加入隊尾成功 for (;;) { Node t = tail; if (t == null) { // 如果佇列為空,則必須先初始化CLH佇列,新建一個空節點標識作為Hader節點,並將tail 指向它 if (compareAndSetHead(new Node())) tail = head; } else {// 正常流程,加入佇列尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
出隊操作
同步佇列(CLH)遵循FIFO,首節點是獲取同步狀態的節點,首節點的執行緒釋放同步狀態後,將會喚醒它的後繼節點(next),而後繼節點將會在獲取同步狀態成功時將自己設定為首節點
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
Condition條件佇列
public class ConditionObject implements Condition, java.io.Serializable { /** First node of condition queue. */ private transient Node firstWaiter; // 頭節點 /** Last node of condition queue. */ private transient Node lastWaiter; // 尾節點 public ConditionObject() { } // ... 省略內部程式碼 }
Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的物件,以便通過將這些物件與任意 Lock 實現組合使用,為每個物件提供多個等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。
// ========== 阻塞 ========== // 造成當前執行緒在接到訊號或被中斷之前一直處於等待狀態。 void await() throws InterruptedException; // 造成當前執行緒在接到訊號之前一直處於等待狀態。 void awaitUninterruptibly(); // 造成當前執行緒在接到訊號、被中斷或到達指定等待時間之前一直處於等待狀態。返回值表示剩餘時間, // 如果在`nanosTimeout` 之前喚醒,那麼返回值 `= nanosTimeout - 消耗時間` ,如果返回值 `<= 0` , //則可以認定它已經超時了。 long awaitNanos(long nanosTimeout) throws InterruptedException; // 造成當前執行緒在接到訊號、被中斷或到達指定等待時間之前一直處於等待狀態。 boolean await(long time, TimeUnit unit) throws InterruptedException; // 造成當前執行緒在接到訊號、被中斷或到達指定最後期限之前一直處於等待狀態。如果沒有到指定時間就被通知,則返回 // true ,否則表示到了指定時間,返回返回 false 。 boolean awaitUntil(Date deadline) throws InterruptedException; // ========== 喚醒 ========== // 喚醒一個等待執行緒。該執行緒從等待方法返回前必須獲得與Condition相關的鎖。 pthread_cond_signal void signal(); // 喚醒所有等待執行緒。能夠從等待方法返回的執行緒必須獲得與Condition相關的鎖。 void signalAll();
例子:
Condition.await() CLH佇列首部出隊,入隊condition佇列尾部 Condition.signal() condition佇列首部喚醒出隊,入隊CLH佇列尾部
入隊
public final void await() throws InterruptedException { // 當前執行緒中斷 if (Thread.interrupted()) throw new InterruptedException(); //當前執行緒加入等待佇列 Node node = addConditionWaiter(); //釋放鎖 long savedState = fullyRelease(node); int interruptMode = 0; /** * 檢測此節點的執行緒是否在同步隊上,如果不在,則說明該執行緒還不具備競爭鎖的資格,則繼續等待 * 直到檢測到此節點在同步佇列上 */ while (!isOnSyncQueue(node)) { //執行緒掛起 LockSupport.park(this); //如果已經中斷了,則退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //競爭同步狀態 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清理下條件佇列中的不是在等待條件的節點 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
private Node addConditionWaiter() { Node t = lastWaiter; //尾節點 //Node的節點狀態如果不為CONDITION,則表示該節點不處於等待狀態,需要清除節點 if (t != null && t.waitStatus != Node.CONDITION) { //清除條件佇列中所有狀態不為Condition的節點 unlinkCancelledWaiters(); t = lastWaiter; } //當前執行緒新建節點,狀態 CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); /** * 將該節點加入到條件佇列中最後一個位置 */ if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
出隊
呼叫 ConditionObject的 #signal()
方法,將會喚醒在等待佇列中等待最長時間的節點(條件佇列裡的首節點),在喚醒節點前,會將節點移到CLH同步佇列中。
public final void signal() { //檢測當前執行緒是否為擁有鎖的獨 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //頭節點,喚醒條件佇列中的第一個節點 Node first = firstWaiter; if (first != null) doSignal(first); //喚醒 } private void doSignal(Node first) { do { //修改頭結點,完成舊頭結點的移出工作 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //將該節點從狀態CONDITION改變為初始狀態0, if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //將節點加入到CLH 同步佇列中去,返回的是CLH 同步佇列中node節點前面的一個節點 Node p = enq(node); int ws = p.waitStatus; //如果結點p的狀態為cancel 或者修改waitStatus失敗,則直接喚醒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }