1. 程式人生 > >AbstractQueuedSynchronizer超詳細原理解析

AbstractQueuedSynchronizer超詳細原理解析

 今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴於這個類所提供佇列式同步器,比如說常用的ReentranLockSemaphoreCountDownLatch等。

 為了方便理解,我們以一段使用ReentranLock的程式碼為例,講解ReentranLock每個方法中有關AQS的使用。

ReentranLock示例

 我們都知道ReentranLock的加鎖行為和Synchronized類似,都是可重入的鎖,但是二者的實現方式確實完全不同的,我們之後也會講解Synchronized的原理。除此之外,Synchronized的阻塞無法被中斷,而ReentrantLock則提供了可中斷的阻塞

。下面的程式碼是ReentranLock的函式,我們就以此為順序,依次講解這些函式背後的實現原理。

ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();
複製程式碼

公平鎖和非公平鎖

ReentranLock分為公平鎖和非公平鎖,二者的區別就在獲取鎖機會是否和排隊順序相關。我們都知道,如果鎖被另一個執行緒持有,那麼申請鎖的其他執行緒會被掛起等待,加入等待佇列。理論上,先呼叫lock函式被掛起等待的執行緒應該排在等待佇列的前端,後呼叫的就排在後邊。如果此時,鎖被釋放,需要通知等待執行緒再次嘗試獲取鎖,公平鎖會讓最先進入佇列的執行緒獲得鎖。而非公平鎖則會喚醒所有執行緒,讓它們再次嘗試獲取鎖,所以可能會導致後來的執行緒先獲得了鎖,則就是非公平。

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
複製程式碼

 我們會發現FairSyncNonfairSync都繼承了Sync類,而Sync的父類就是AbstractQueuedSynchronizer(後續簡稱AQS)。但是AQS的建構函式是空的,並沒有任何操作。

 之後的原始碼分析,如果沒有特別說明,就是指公平鎖。

lock操作

ReentranLocklock函式如下所示,直接呼叫了synclock函式。也就是呼叫了FairSynclock

函式。

    //ReentranLock
    public void lock() {
        sync.lock();
    }
    //FairSync
    final void lock() {
        //呼叫了AQS的acquire函式,這是關鍵函式之一
        acquire(1);
    }
複製程式碼

 我們接下來就正式開始AQS相關的原始碼分析了,acquire函式的作用是獲取同一時間段內只能被一個執行緒獲取的量,這個量就是抽象化的鎖概念。我們先分析程式碼,你慢慢就會明白其中的含義。

public final void acquire(int arg) {
	// tryAcquire先嚐試獲取"鎖",獲取了就不進入後續流程
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //addWaiter是給當前執行緒建立一個節點,並將其加入等待佇列
        //acquireQueued是當執行緒已經加入等待佇列之後繼續嘗試獲取鎖.
        selfInterrupt();
}
複製程式碼

tryAcquireaddWaiteracquireQueued都是十分重要的函式,我們接下來依次學習一下這些函式,理解它們的作用。

//AQS類中的變數.
private volatile int state;
//這是FairSync的實現,AQS中未實現,子類按照自己的需要實現該函式
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //獲取AQS中的state變數,代表抽象概念的鎖.
    int c = getState();
    if (c == 0) { //值為0,那麼當前獨佔性變數還未被執行緒佔有
        //如果當前阻塞佇列上沒有先來的執行緒在等待,UnfairSync這裡的實現就不一致
        if (!hasQueuedPredecessors() && 
            compareAndSetState(0, acquires)) {
            //成功cas,那麼代表當前執行緒獲得該變數的所有權,也就是說成功獲得鎖
            setExclusiveOwnerThread(current);
            // setExclusiveOwnerThread將本執行緒設定為獨佔性變數所有者執行緒
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        //如果該執行緒已經獲取了獨佔性變數的所有權,那麼根據重入性
        //原理,將state值進行加1,表示多次lock
        //由於已經獲得鎖,該段程式碼只會被一個執行緒同時執行,所以不需要
        //進行任何並行處理
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //上述情況都不符合,說明獲取鎖失敗
    return false;
}
複製程式碼

 由上述程式碼我們可以發現,tryAcquire就是嘗試獲取那個執行緒獨佔的變數state。state的值表示其狀態:如果是0,那麼當前還沒有執行緒獨佔此變數;否在就是已經有執行緒獨佔了這個變數,也就是代表已經有執行緒獲得了鎖。但是這個時候要再進行一次判斷,看是否是當前執行緒自己獲得的這個鎖,如果是,就增加state的值。

ReentranLock獲得鎖

 這裡有幾點需要說明一下,首先是compareAndSetState函式,這是使用CAS操作來設定state的值,而且state值設定了volatile修飾符,通過這兩點來確保修改state的值不會出現多執行緒問題。然後是公平鎖和非公平鎖的區別問題,在UnfairSyncnonfairTryAcquire函式中不會在相同的位置上呼叫hasQueuedPredecessors來判斷當前是否已經有執行緒在排隊等待獲得鎖。

 如果tryAcquire返回true,那麼就是獲取鎖成功;如果返回false,那麼就是未獲得鎖,需要加入阻塞等待佇列。我們下面就來看一下addWaiter的相關操作。

等待鎖的阻塞佇列

 將儲存當前執行緒資訊的節點加入到等待佇列的相關函式中涉及到了無鎖佇列的相關演算法,由於在AQS中只是將節點新增到隊尾,使用到的無鎖演算法也相對簡單。真正的無鎖佇列的演算法我們等到分析ConcurrentSkippedListMap時在進行講解。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //先使用快速入列法來嘗試一下,如果失敗,則進行更加完備的入列演算法.
    //只有在必要的情況下才會使用更加複雜耗時的演算法,也就是樂觀的態度
    Node pred = tail; //列尾指標
    if (pred != null) {
        node.prev = pred; //步驟1:該節點的前趨指標指向tail
        if (compareAndSetTail(pred, node)){ //步驟二:cas將尾指標指向該節點
            pred.next = node;//步驟三:如果成果,讓舊列尾節點的next指標指向該節點.
            return node;
        }
    }
    //cas失敗,或在pred == null時呼叫enq
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) { //cas無鎖演算法的標準for迴圈,不停的嘗試
        Node t = tail;
        if (t == null) { //初始化
            if (compareAndSetHead(new Node())) 
              //需要注意的是head是一個哨兵的作用,並不代表某個要獲取鎖的執行緒節點
                tail = head;
        } else {
            //和addWaiter中一致,不過有了外側的無限迴圈,不停的嘗試,自旋鎖
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
複製程式碼

 通過呼叫addWaiter函式,AQS將當前執行緒加入到了等待佇列,但是還沒有阻塞當前執行緒的執行,接下來我們就來分析一下acquireQueued函式。

等待佇列節點的操作

 由於進入阻塞狀態的操作會降低執行效率,所以,AQS會盡力避免試圖獲取獨佔性變數的執行緒進入阻塞狀態。所以,當執行緒加入等待佇列之後,acquireQueued會執行一個for迴圈,每次都判斷當前節點是否應該獲得這個變數(在隊首了)。如果不應該獲取或在再次嘗試獲取失敗,那麼就呼叫shouldParkAfterFailedAcquire判斷是否應該進入阻塞狀態。如果當前節點之前的節點已經進入阻塞狀態了,那麼就可以判定當前節點不可能獲取到鎖,為了防止CPU不停的執行for迴圈,消耗CPU資源,呼叫parkAndCheckInterrupt函式來進入阻塞狀態。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { //一直執行,直到獲取鎖,返回.
            final Node p = node.predecessor(); 
            //node的前驅是head,就說明,node是將要獲取鎖的下一個節點.
            if (p == head && tryAcquire(arg)) { //所以再次嘗試獲取獨佔性變數
                setHead(node); //如果成果,那麼就將自己設定為head
                p.next = null; // help GC
                failed = false;
                return interrupted;
                //此時,還沒有進入阻塞狀態,所以直接返回false,表示不需要中斷呼叫selfInterrupt函式
            }
            //判斷是否要進入阻塞狀態.如果`shouldParkAfterFailedAcquire`
            //返回true,表示需要進入阻塞
            //呼叫parkAndCheckInterrupt;否則表示還可以再次嘗試獲取鎖,繼續進行for迴圈
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //呼叫parkAndCheckInterrupt進行阻塞,然後返回是否為中斷狀態
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //前一個節點在等待獨佔性變數釋放的通知,所以,當前節點可以阻塞
        return true;
    if (ws > 0) { //前一個節點處於取消獲取獨佔性變數的狀態,所以,可以跳過去
        //返回false
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //將上一個節點的狀態設定為signal,返回false,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); //將AQS物件自己傳入
    return Thread.interrupted();
}

複製程式碼

阻塞和中斷

 由上述分析,我們知道了AQS通過呼叫LockSupportpark方法來執行阻塞當前程序的操作。其實,這裡的阻塞就是執行緒不再執行的含義,通過呼叫這個函式,執行緒進入阻塞狀態,上述的lock操作也就阻塞了,等待中斷或在獨佔性變數被釋放。

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);//設定阻塞物件,用來記錄執行緒被誰阻塞的,用於執行緒監控和分析工具來定位
    UNSAFE.park(false, 0L);//讓當前執行緒不再被執行緒排程,就是當前執行緒不再執行.
    setBlocker(t, null);
}
複製程式碼

 關於中斷的相關知識,我們以後再說,就繼續沿著AQS的主線,看一下釋放獨佔性變數的相關操作吧。

ReentrantLock未獲得阻塞,加入佇列

unlock操作

 與lock操作類似,unlock操作呼叫了AQSrelase方法,引數和呼叫acquire時一樣,都是1。

public final boolean release(int arg) {
    if (tryRelease(arg)) { 
    //釋放獨佔性變數,起始就是將status的值減1,因為acquire時是加1
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//喚醒head的後繼節點
        return true;
    }
    return false;
}
複製程式碼

 由上述程式碼可知,release就是先呼叫tryRelease來釋放獨佔性變數。如果成功,那麼就看一下是否有等待鎖的阻塞執行緒,如果有,就呼叫unparkSuccessor來喚醒他們。

protected final boolean tryRelease(int releases) {
    //由於只有一個執行緒可以獲得獨佔先變數,所以,所有操作不需要考慮多執行緒
    int c = getState() - releases; 
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { //如果等於0,那麼說明鎖應該被釋放了,否則表示當前執行緒有多次lock操作.
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
複製程式碼

 我們可以看到tryRelease中的邏輯也體現了可重入鎖的概念,只有等到state的值為0時,才代表鎖真正被釋放了。所以獨佔性變數state的值就代表鎖的有無。當state=0時,表示鎖未被佔有,否在表示當前鎖已經被佔有。

private void unparkSuccessor(Node node) {
    .....
     //一般來說,需要喚醒的執行緒就是head的下一個節點,但是如果它獲取鎖的操作被取消,或在節點為null時
     //就直接繼續往後遍歷,找到第一個未取消的後繼節點.
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製程式碼

 呼叫了unpark方法後,進行lock操作被阻塞的執行緒就恢復到執行狀態,就會再次執行acquireQueued中的無限for迴圈中的操作,再次嘗試獲取鎖。

ReentrantLock釋放鎖並通知阻塞執行緒恢復執行

後記

 有關AQSReentrantLock的分析就差不多結束了。不得不說,我第一次看到AQS的實現時真是震驚,以前都認為SynchronizedReentrantLock的實現原理是一致的,都是依靠java虛擬機器的功能實現的。沒有想到還有AQS這樣一個背後大Boss在提供幫助啊。學習了這個類的原理,我們對JUC的很多類的分析就簡單了很多。此外,AQS涉及的CAS操作和無鎖佇列的演算法也為我們學習其他無鎖演算法提供了基礎。知識的海洋是無限的啊!