1. 程式人生 > >ReentrantLock原始碼探究1:非公平鎖的獲取和釋放

ReentrantLock原始碼探究1:非公平鎖的獲取和釋放

1.AQS簡單介紹

​ Sync是ReentrantLock的一個內部類,它繼承了AbstractQueuedSynchronizer,即AQS,在CountDownLatch、FutureTask、Semaphore、ReentrantLock等原始碼中,我們都能看到它們的身影,足見其重要性。此處我們需要先了解下AQS才能更愉悅地閱讀原始碼。

​ AQS中是基於FIFO佇列的實現,那麼它必然包含佇列中元素的定義,在這裡它是Node:

屬 性 定 義
Node SHARED = new Node() 表示Node處於共享模式
Node EXCLUSIVE = null 表示Node處於獨佔模式
int CANCELLED = 1 因為超時或者中斷,Node被設定為取消狀態,被取消的Node不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態,處於這種狀態的Node會被踢出佇列,被GC回收
int SIGNAL = -1 表示這個Node的繼任Node被阻塞了,到時需要通知它
int CONDITION = -2 表示這個Node在條件佇列中,因為等待某個條件而被阻塞
int PROPAGATE = -3 使用在共享模式頭Node有可能處於這種狀態, 表示鎖的下一次獲取可以無條件傳播
int waitStatus 0,新Node會處於這種狀態
Node prev 佇列中某個Node的前驅Node
Node next 佇列中某個Node的後繼Node
Thread thread 這個Node持有的執行緒,表示等待鎖的執行緒
Node nextWaiter 表示下一個等待condition的Node

AQS中包含的方法有

屬性/方法 含 義
Thread exclusiveOwnerThread 這個是AQS父類AbstractOwnableSynchronizer的屬性,表示獨佔模式同步器的當前擁有者
Node 上面已經介紹過了,FIFO佇列的基本單位
Node head FIFO佇列中的頭Node
Node tail FIFO佇列中的尾Node
int state 同步狀態,0表示未鎖
int getState() 獲取同步狀態
setState(int newState) 設定同步狀態
boolean compareAndSetState(int expect, int update) 利用CAS進行State的設定
long spinForTimeoutThreshold = 1000L 執行緒自旋等待的時間
Node enq(final Node node) 插入一個Node到FIFO佇列中
Node addWaiter(Node mode) 為當前執行緒和指定模式建立並擴充一個等待佇列
void setHead(Node node) 設定佇列的頭Node
void unparkSuccessor(Node node) 如果存在的話,喚起Node持有的執行緒
void doReleaseShared() 共享模式下做釋放鎖的動作
void cancelAcquire(Node node) 取消正在進行的Node獲取鎖的嘗試
boolean shouldParkAfterFailedAcquire(Node pred, Node node) 在嘗試獲取鎖失敗後是否應該禁用當前執行緒並等待
void selfInterrupt() 中斷當前執行緒本身
boolean parkAndCheckInterrupt() 禁用當前執行緒進入等待狀態並中斷執行緒本身
boolean acquireQueued(final Node node, int arg) 佇列中的執行緒獲取鎖
tryAcquire(int arg) 嘗試獲得鎖(由AQS的子類實現它
tryRelease(int arg) 嘗試釋放鎖(由AQS的子類實現它
isHeldExclusively() 是否獨自持有鎖
acquire(int arg) 獲取鎖
release(int arg) 釋放鎖
compareAndSetHead(Node update) 利用CAS設定頭Node
compareAndSetTail(Node expect, Node update) 利用CAS設定尾Node
compareAndSetWaitStatus(Node node, int expect, int update) 利用CAS設定某個Node中的等待狀態

另外在原始碼中多處使用了CAS,有關CAS的內容,可檢視:
樂觀鎖的一種實現方式:CAS

2.非公平鎖的獲取過程

假設有兩個執行緒:執行緒1和執行緒2嘗試獲取同一個鎖(非公平鎖),過程如下

  1. 執行緒1呼叫lock方法
    final void lock() {
        if (compareAndSetState(0, 1))   //使用CAS將同步狀態設定為1
            setExclusiveOwnerThread(Thread.currentThread());//成功則設定執行緒1為當前鎖的獨佔執行緒
        else
            acquire(1); //設定失敗時,嘗試獲取鎖
    }
//上述程式碼正常情況下執行完畢後,執行緒1成為了獨佔執行緒。
  1. 執行緒2此時也嘗試獲取鎖,呼叫lock方法,此時CAS設定時會失敗,進入acquire方法。
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt(); //重新獲取鎖失敗且執行緒發生了中斷,自行中斷
}
  • 這裡面,會首先呼叫tryAcquire方法嘗試再次獲取鎖,因為我們演示的是非公平鎖,因此呼叫的方法是nonfairTryAcquire。
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();  //current指向當前執行緒2
        int c = getState();      //若執行緒1未釋放鎖,則c>0,若執行緒1已經釋放鎖,則c=0
        if (c == 0) {            //執行緒1已經釋放了鎖
            if (compareAndSetState(0, acquires)) {  //使用CAS將state設定為1
                setExclusiveOwnerThread(current);   //並設定執行緒2為獨佔執行緒
                return true;    //返回true,獲取鎖成功
            }
        }
        //判斷該執行緒是否是重入,即之前已經獲取到了鎖
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires; //每重入一次,將state+1。
            if (nextc < 0) // overflow //state+1<0,說明原state為負數,丟擲異常
                throw new Error("Maximum lock count exceeded");
            setState(nextc);    //設定state為新值
            return true;        //返回true,獲取重入鎖成功。
        }
        return false;           //返回flase,獲取鎖失敗
    }
  • 此時執行緒2使用tryAcquire方法獲取鎖,如果也是失敗,那麼,會呼叫addWaiter(Node.EXCLUSIVE)方法
private Node addWaiter(Node mode) {  //此處mode為獨佔模式
    Node node = new Node(Thread.currentThread(), mode);//將當前執行緒(此處為執行緒2)繫結到新節點node上,並設定為獨佔模式
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;   //獲取原佇列的尾節點pred
    if (pred != null) { //若原尾節點pred非空,則說明已經存在一個佇列
        node.prev = pred;   //設定新節點node的前置為pred
        if (compareAndSetTail(pred, node)) {//使用CAS設定新的尾節點為node
            pred.next = node;   //設定pred的後置為node,建立雙向連結
            return node;        //返回node
        }
    }
    enq(node);      //進入此處說明原佇列不存在,需要初始化佇列
    return node;
}
private Node enq(final Node node) { //此處傳入的引數node是綁定了執行緒2的節點
    for (;;) {
        Node t = tail;      //獲取原佇列的尾節點t
        if (t == null) { // Must initialize //若尾節點為空,說明佇列尚未形成
            if (compareAndSetHead(new Node())) //設定一個空的,未繫結任何執行緒的節點為新佇列的頭節點
                tail = head;    //新佇列只有一個節點,既是頭也是尾
        } else {        //若t非空,說明佇列已經形成
            node.prev = t;  //將node的前置設為t
            if (compareAndSetTail(t, node)) { //CAS設定新的尾節點為node
                t.next = node;  //設定t的後繼為node,建立雙向連結
                return t;       //返回t
            }
        }
    }
}
  • 現在看外層方法acquireQueued,此時傳入的引數node是執行緒2所在節點,該方法的作用是在等待佇列中,當有其他執行緒釋放了資源,那麼佇列中在等待的執行緒就可以開始行動
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; //是否獲取到資源
    try {
        boolean interrupted = false; //等待過程中是否被中斷
        //自旋,維護等待佇列中執行緒的執行。
        for (;;) {
            final Node p = node.predecessor(); //獲取node的前置p
            if (p == head && tryAcquire(arg)) { //若前置p為頭結點並且重新獲取鎖成功
                setHead(node);                  //設定新的頭節點為node
                p.next = null; // help GC       //取消p和連結串列的連結
                failed = false;                 //獲取資源未失敗
                return interrupted;             //等待過程未被中斷
            }
            if (shouldParkAfterFailedAcquire(p, node) &&  //若前置節點是Node.SIGNAL狀態
                parkAndCheckInterrupt())        //將節點設定為Waitting狀態
                interrupted = true;   //此時執行緒中斷狀態為true
        }
    } finally { 
        if (failed)                   //如果獲取資源成功那麼取消獲取過程
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;    //獲取前置節點的等待狀態
        if (ws == Node.SIGNAL)      //Node.SIGNAL表示繼任者執行緒需要被喚醒,那麼就可以直接返回;
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {       //若ws>0,說明前驅被取消,那麼執行迴圈往前一直查詢,知道找到未被取消的,將node排在它的後面。
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {   //進入else,說明ws=0或者Node.PROPAGATE
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //使用CAS設定前置的節點狀態為Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

該部分程式碼可以用現實中排隊辦理業務的情況來說明:

假設你排隊去辦理業務,隊伍很長,因此除了當前正在辦理業務的人,其他所有排隊的人都在低頭玩手機,且每個排隊的人有以下三種狀態:①.正常排隊,且辦完業務後會通知後面的人別玩手機了可以開始辦理業務了。②.發現隊伍過長,不排隊了,走了。③.正常排隊,辦完業務後不通知後面的人,直接走。
   此時你進入該隊伍的尾部開始排隊。
1.第一步,判斷排隊在你前面的人是否會通知你,如果會通知,那麼我們就可以不用關心其他問題,在佇列中待著玩手機即可。
2.第二步,如果發現排在你前面的人不排隊了,要走了,那麼此時我們就得往前走一位,並開始不斷詢問前面的人是不是也準備不排隊了,直到我們排在了一個確定不會走的人後面。
3.第三步,排在你前面的人不是準備走的,但是他也不會通知你,那麼你就要告訴他,一定得在辦完業務後通知你。

當我們確定我們已經在佇列中待好後(前置會通知我們),那麼我們就可以開始休息。parkAndCheckInterrupt方法讓我們的執行緒進入等待的狀態,即休息狀態。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);  //呼叫park()使執行緒進入waiting狀態
    return Thread.interrupted(); //如果被喚醒,檢視自己是不是被中斷的。   
}

3.鎖的釋放過程

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {  //若tryRelease後無人佔用鎖
        Node h = head;      //獲取佇列的頭結點h
        if (h != null && h.waitStatus != 0) //若h非空,且h的waitStatus不為0
            unparkSuccessor(h);     //喚醒後繼
        return true;
    }
    return false;
}
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;  //當前state-1,得到c
        if (Thread.currentThread() != getExclusiveOwnerThread()) //執行releas的不是獲取鎖的獨佔執行緒,丟擲異常
            throw new IllegalMonitorStateException();
        boolean free = false;   //free用來標記鎖是否可獲取狀態
        if (c == 0) {           //若state=0
            free = true;        //那麼當前鎖是可獲取的
            setExclusiveOwnerThread(null);  //設定當前鎖的獨佔執行緒為null
        }
        setState(c);            //設定當前state為c
        return free;            //返回鎖是否是可獲取狀態
    }
 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus; //獲取當前執行緒對應節點的waitStatus
        if (ws < 0)              //將當前執行緒對應節點waitStatus置為0
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;     //獲取當前執行緒對應節點的後繼節點s
        if (s == null || s.waitStatus > 0) { //若s為空或s的狀態是canceled
            s = null;                        //將s設定為null。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)  //此處從尾到頭進行遍歷,找到佇列最前列的節點且狀態不是Canceled,將其設定為s。但此處為何從尾部開始遍歷尚未弄清楚。
                    s = t;
        }
        if (s != null)      //若上述遍歷找到的s非空
            LockSupport.unpark(s.thread);  //呼叫lockSupport.unpark喚醒s對應的執行緒
    }

release方法的邏輯仍然可以用一個辦理完業務的人的後續動作來進行說明:

1.若A辦理業務後無其他業務需要辦理,那麼表示當前業務視窗是free的。
2.A將自己的等待狀態置為0,相當於退出佇列。然後檢查自己後面的人是否是空或者取消排隊的狀態。若為真,將後置設為空。
3.從佇列的尾部遍歷到頭部,直到找到佇列最前頭的那個,且它的等待狀態不是取消狀態,那麼將其喚醒,告知他可以開始辦理業務了。

4.關於原始碼的一點疑問

本文中部分原始碼本人暫時也尚未能理解,希望各位大佬不吝賜教,主要有以下一些問題:
1.在unparkSuccessor方法中,找到佇列下一個節點並將其喚醒時,為什麼從尾到頭遍歷

        if (s == null || s.waitStatus > 0) { //若s為空或s的狀態是canceled
            s = null;                        //將s設定為null。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)    //倒序遍歷?
                    s = t;
        }

2.在acquireQueued方法中,自旋結束後的finally程式碼塊的作用。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; //是否獲取到資源
    try {
        boolean interrupted = false; //等待過程中是否被中斷
        //自旋,維護等待佇列中執行緒的執行。
        for (;;) {
            final Node p = node.predecessor(); //獲取node的前置p
            if (p == head && tryAcquire(arg)) { //若前置p為頭結點並且重新獲取鎖成功
                setHead(node);                  //設定新的頭節點為node
                p.next = null; // help GC       //取消p和連結串列的連結
                failed = false;                 //獲取資源未失敗
                return interrupted;             //等待過程未被中斷
            }
            if (shouldParkAfterFailedAcquire(p, node) &&  //若前置節點是Node.SIGNAL狀態
                parkAndCheckInterrupt())        //將節點設定為Waitting狀態
                interrupted = true;   //此時執行緒中斷狀態為true
        }
    } finally { 
        if (failed)                       //如果自旋結束,那麼說明failed = false已經執行了,那麼這個canclAcquire方法什麼情況下會執行?
            cancelAcquire(node);
    }
}