1. 程式人生 > 其它 >AQS同步佇列和條件佇列

AQS同步佇列和條件佇列

static final class Node {
    //共享模式,資源可以同時去拿
    static final Node SHARED = new Node();
    //獨佔模式,只能有一個執行緒去拿
    static final Node EXCLUSIVE = null;

   //表示當前執行緒被中斷了,在佇列中沒有任何意義,可以被剔除了
    static final int CANCELLED =  1;
    /**
     * 後繼節點的執行緒處於等待狀態,而當前節點如果釋放了同步狀態或者被取消,
     * 將會通知後繼節點,使後繼節點得以執行
     */
    static final int SIGNAL    = -1;

    /**
     * 節點在等待佇列中,節點的執行緒等待在Condition上,當其他執行緒對Condition呼叫了signal()方法後,
     * 該節點會從等待佇列中轉移到同步佇列中,加入到同步狀態的獲取中
     */
    static final int CONDITION = -2;
    /**
     * 表示下一次共享方式同步狀態獲取將會被無條件的傳播下去
     */
    static final int PROPAGATE = -3;

    /**
     * 標記當前節點的訊號量狀態(1,0,-1,-2,-3)5種狀態
     * 使用CAS更改狀態,volatile保證執行緒可見性,併發場景下,
     * 即被一個執行緒修改後,狀態會立馬讓其他執行緒可見
     */
    volatile int waitStatus;

    /**
     * 前驅節點,當前節點加入到同步佇列中被設定
     */
    volatile Node prev;

    /**
     * 後繼節點
     */
    volatile Node next;

    /**
     * 節點同步狀態的執行緒
     */
    volatile Thread thread;

    /**
     * 等待佇列中的後繼節點,如果當前節點是共享的,那麼這個欄位是一個SHARED常量
     * 也就是說節點型別(獨佔和共享)和等待佇列中的後繼節點公用一個欄位
     * (用在條件佇列裡面)
     */
    Node nextWaiter;
    }

  CLH同步佇列

CLH 同步佇列是一個 FIFO 雙向佇列,AQS 依賴它來完成同步狀態的管理:

  • 當前執行緒如果獲取同步狀態失敗時,AQS則會將當前執行緒已經等待狀態等資訊構造成一個節點(Node)並將其加入到CLH同步佇列,同時會阻塞當前執行緒
  • 當同步狀態釋放時,會把首節點喚醒,使其再次嘗試獲取同步狀態。

state為0,表示可以競爭鎖。

state為1,表示無鎖。可重入鎖state可以++。

1、執行緒一和執行緒二cas競爭

2、執行緒二競爭失敗,放入同步佇列。呼叫locksupport.park阻塞。

3、執行緒一執行成功釋放鎖,state置為0,喚醒執行緒二,重複1步驟。

入隊操作

通過“自旋”也就是死迴圈的方式來保證該節點能順利的加入到佇列尾部,只有加入成功才會退出迴圈,否則會一直循序直到成功。

private Node addWaiter(Node mode) {
// 以給定的模式來構建節點, mode有兩種模式 
//  共享式SHARED, 獨佔式EXCLUSIVE;
  Node node = new Node(Thread.currentThread(), mode);
    // 嘗試快速將該節點加入到佇列的尾部
    Node pred = tail;
     if (pred != null) {
        node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果快速加入失敗,則通過 anq方式入列
        enq(node);
        return node;
    }

private Node enq(final Node node) {
// CAS自旋,直到加入隊尾成功        
for (;;) {
    Node t = tail;
        if (t == null) { // 如果佇列為空,則必須先初始化CLH佇列,新建一個空節點標識作為Hader節點,並將tail 指向它
            if (compareAndSetHead(new Node()))
                tail = head;
            } else {// 正常流程,加入佇列尾部
                node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                }
            }
        }
    }

  

出隊操作

同步佇列(CLH)遵循FIFO,首節點是獲取同步狀態的節點,首節點的執行緒釋放同步狀態後,將會喚醒它的後繼節點(next),而後繼節點將會在獲取同步狀態成功時將自己設定為首節點

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

  

Condition條件佇列

public class ConditionObject implements Condition, java.io.Serializable {    
    /** First node of condition queue. */    
    private transient Node firstWaiter; // 頭節點    
    /** Last node of condition queue. */    
    private transient Node lastWaiter; // 尾節點        
    public ConditionObject() {    }    // ... 省略內部程式碼
}

  

Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的物件,以便通過將這些物件與任意 Lock 實現組合使用,為每個物件提供多個等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。

// ========== 阻塞 ==========   
// 造成當前執行緒在接到訊號或被中斷之前一直處於等待狀態。
void await() throws InterruptedException; 
// 造成當前執行緒在接到訊號之前一直處於等待狀態。
void awaitUninterruptibly(); 
// 造成當前執行緒在接到訊號、被中斷或到達指定等待時間之前一直處於等待狀態。返回值表示剩餘時間,
// 如果在`nanosTimeout` 之前喚醒,那麼返回值 `= nanosTimeout - 消耗時間` ,如果返回值 `<= 0` ,
//則可以認定它已經超時了。
long awaitNanos(long nanosTimeout) throws InterruptedException; 
// 造成當前執行緒在接到訊號、被中斷或到達指定等待時間之前一直處於等待狀態。
boolean await(long time, TimeUnit unit) throws InterruptedException; 
// 造成當前執行緒在接到訊號、被中斷或到達指定最後期限之前一直處於等待狀態。如果沒有到指定時間就被通知,則返回 // true ,否則表示到了指定時間,返回返回 false 。
boolean awaitUntil(Date deadline) throws InterruptedException; 
// ========== 喚醒 ==========
// 喚醒一個等待執行緒。該執行緒從等待方法返回前必須獲得與Condition相關的鎖。 pthread_cond_signal
void signal(); 
// 喚醒所有等待執行緒。能夠從等待方法返回的執行緒必須獲得與Condition相關的鎖。
void signalAll(); 

  

例子:

Condition.await()  CLH佇列首部出隊,入隊condition佇列尾部 Condition.signal()  condition佇列首部喚醒出隊,入隊CLH佇列尾部

入隊

public final void await() throws InterruptedException {
    // 當前執行緒中斷
    if (Thread.interrupted())
        throw new InterruptedException();
    //當前執行緒加入等待佇列
    Node node = addConditionWaiter();
    //釋放鎖
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    /**
     * 檢測此節點的執行緒是否在同步隊上,如果不在,則說明該執行緒還不具備競爭鎖的資格,則繼續等待
     * 直到檢測到此節點在同步佇列上
     */
    while (!isOnSyncQueue(node)) {
        //執行緒掛起
        LockSupport.park(this);
        //如果已經中斷了,則退出
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //競爭同步狀態
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清理下條件佇列中的不是在等待條件的節點
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

  

private Node addConditionWaiter() {
    Node t = lastWaiter;    //尾節點
    //Node的節點狀態如果不為CONDITION,則表示該節點不處於等待狀態,需要清除節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        //清除條件佇列中所有狀態不為Condition的節點
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //當前執行緒新建節點,狀態 CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    /**
     * 將該節點加入到條件佇列中最後一個位置
     */
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

  

出隊

呼叫 ConditionObject的 #signal() 方法,將會喚醒在等待佇列中等待最長時間的節點(條件佇列裡的首節點),在喚醒節點前,會將節點移到CLH同步佇列中。

public final void signal() {
    //檢測當前執行緒是否為擁有鎖的獨
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //頭節點,喚醒條件佇列中的第一個節點
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);    //喚醒
}

private void doSignal(Node first) {
    do {
        //修改頭結點,完成舊頭結點的移出工作
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
}

 final boolean transferForSignal(Node node) {
    //將該節點從狀態CONDITION改變為初始狀態0,
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    //將節點加入到CLH 同步佇列中去,返回的是CLH 同步佇列中node節點前面的一個節點
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果結點p的狀態為cancel 或者修改waitStatus失敗,則直接喚醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}