1. 程式人生 > >AQS系列(一)- ReentrantLock的加鎖

AQS系列(一)- ReentrantLock的加鎖

前言

    AQS即AbstractQueuedSynchronizer,是JUC包中的一個核心抽象類,JUC包中的絕大多數功能都是直接或間接通過它來實現的。本文是AQS系列的第一篇,後面會持續更新多篇,爭取將JUC包中AQS相關的常用功能講清楚,一方面鞏固自己的知識體系,一方面亦可與各位園友互相學習。寒冷的冬天,要用技術來溫暖自己。

一、AQS與ReentrantLock的關係

    先奉上一張自制的醜陋類圖

 

 

 

     從下往上看,ReentrantLock類內部有兩個靜態內部類FairSync和NonfairSync,分別代表了公平鎖和非公平鎖(注意ReentrantLock實現的鎖是可重入排它鎖)。這兩個靜態內部類又共同繼承了ReentrantLock的一個內部靜態抽象類Sync,此抽象類繼承AQS。

    類的關係搞清楚了,我們下面一起看一下原始碼。

二、原始碼解讀

    ReentrantLock的預設構造方法建立的是非公平鎖,也可以通過傳入true來指定生成公平鎖。下面我們以公平鎖的加鎖過程為例,進行解讀原始碼。在解讀原始碼之前需要先明確一下AQS中的state屬性,它是int型別,state=0表示當前lock沒有被佔用,state=1表示被佔用,如果是重入狀態,則重入了幾次state就是幾。

 1 public class JucLockDemo1 {
 2     public static void main(String[] args){
 3         ReentrantLock lock = new ReentrantLock(true);
 4         Thread t1 = new Thread(() -> {
 5             lock.lock();
 6             // 業務邏輯
 7             lock.unlock();
 8         });
 9         t1.start();
10         System.out.println("main end");
11     }
12 }

    其中第5行lock方法點進去的程式碼:

1 public void lock() {
2         sync.lock();
3     }

    直接調了sync的lock方法,sync下面的lock方法是抽象方法,方法邏輯取決於具體的實現類,因為我們這裡建立的是公平鎖,所以進FairSync看它的lock方法實現:

1 final void lock() {
2             acquire(1);
3         }

    FairSync中的lock方法很簡單,直接呼叫了acquire方法,引數是1,繼續跟蹤:

1 public final void acquire(int arg) {
2         if (!tryAcquire(arg) &&
3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4             selfInterrupt();
5     }

    acquire方法位於AQS中,很重要,雖然只有短短的三行,但是裡面的內容非常多。下面對裡面的方法分別進行解讀。

方法1:tryAcquire(arg)

    此方法在FairSync中進行了實現,程式碼如下所示:

 1 protected final boolean tryAcquire(int acquires) {
 2             final Thread current = Thread.currentThread();
 3             int c = getState();
 4             // 判斷state狀態,如果是0表示鎖空閒,可以去嘗試獲取
 5             if (c == 0) {
 6                 if (!hasQueuedPredecessors() &&
 7                     compareAndSetState(0, acquires)) {
 8                     setExclusiveOwnerThread(current);
 9                     return true;
10                 }
11             }// exclusiceOwnerThread存放的是當前執行的獨佔執行緒,如果此處判斷為true,說明是當前執行緒第二次加鎖,可以重入,只是要將state+1
12             else if (current == getExclusiveOwnerThread()) {
13                 int nextc = c + acquires;
14                 if (nextc < 0)
15                     throw new Error("Maximum lock count exceeded");
16                 setState(nextc);
17                 return true;
18             }
19             return false;
20         }

    第二個if判斷很好理解,是ReentrantLock對重入和排他的支援(所以說它是可重入排他鎖),但是判斷c==0之後的邏輯就比較麻煩了。

    首先理解一下當前的邏輯:如果state=0說明lock空閒,又因為是公平鎖,所以要先判斷當前AQS佇列中還有沒有排隊的任務,如果沒有的話,就走一個CAS將state改成1,然後設定排他的執行執行緒,獲取執行權;如果佇列中有任務,那麼acquire方法只能先返回false了。那麼可以推斷出,hasQueuedPredecessors方法就是用來判斷佇列中是否有排隊的。

    點進去看看Lea大神的實現邏輯吧。

 1 public final boolean hasQueuedPredecessors() {
 2         // The correctness of this depends on head being initialized
 3         // before tail and on head.next being accurate if the current
 4         // thread is first in queue.
 5         Node t = tail; // Read fields in reverse initialization order
 6         Node h = head;
 7         Node s;
 8         return h != t &&
 9             ((s = h.next) == null || s.thread != Thread.currentThread());
10     }

    程式碼不多,但表達的意思比較晦澀。第一個判斷h!=t,如果h=t,說明佇列是空的,這時這個判斷條件是false,方法直接就返回了,這時外面的if取反是true,會繼續走CAS搶佔state和排他執行緒,獲取鎖,這種情況的路就走完了。如果h!=t為true,說明現在佇列中有任務,這時進入後面的大括號 ((s = h.next) == null || s.thread != Thread.currentThread()) ,在佇列中有任務的情況下,還有兩種可能,一種是佇列中的第一個任務就是當前執行緒,另一種是第一個任務不是當前執行緒。因為是公平鎖,如果第一個任務時當前執行緒的話,那麼它有權再去申請一下獲取鎖,如果第一個任務不是當前執行緒,那麼當前執行緒就乖乖排隊吧,等前面的執行完了才能輪到你。後面的大括號就是對這兩種情況進行了區分,我們用反向邏輯來分析。方法hasQueuedPredecessors表示如果當前執行緒可以去競爭鎖則返回false,不能競爭鎖則返回true後面大括號結果為false的話當前執行緒才會去搶佔鎖,一個或運算怎樣才能是false?或的兩邊都是false,就是說要(s = h.next) != null && s.thread == Thread.currentThread(),意思就是佇列中第一個任務不為空且第一個任務就是當前執行緒,而這個&&的非與上述原始碼中的||在邏輯上是等價的,所以到這裡意思就清楚了,return的&&連線的兩個條件意思是:判斷是否佇列不為空且(第一個任務為空或者不是當前執行緒)。

    hasQueuedPredecessors方法講完,tryAcquire方法就沒有什麼難點了,這時我們回到上面開始的acquire(int arg)方法。如果tryAcquire返回的是true,說明獲取到了鎖,那麼就不會再走後面的流程了;如果返回的是false,則進入acquireQueue。但我們先看裡面的addWaiter方法。

方法2:  addWaiter(Node.EXCLUSIVE), arg)

     此方法用於生成當前執行緒的node節點並把它放在隊尾,方法原始碼:

 1 private Node addWaiter(Node mode) {
 2         Node node = new Node(Thread.currentThread(), mode);// 建立當前執行緒的node節點
 3         // Try the fast path of enq; backup to full enq on failure
 4         Node pred = tail;
 5         if (pred != null) { // 判斷隊尾是否為空,如果不為空則將node節點拼接在後面
 6             node.prev = pred; // 將node節點連線到隊尾節點
 7             if (compareAndSetTail(pred, node)) { // 通過CAS將node節點放到隊尾
 8                 pred.next = node; // 如果CAS操作成功了,那麼將原隊尾節點的next連線到node節點,組成雙向佇列
 9                 return node;
10             }
11         }
12         enq(node); // 能到這裡的話分兩種情況:1、隊尾是空的;2、隊尾不是空的,但是進行CAS操作時由於被其他執行緒搶佔導致失敗;
13         return node;
14     }

通過註解大家應該能梳理清楚邏輯,下面著重說一下enq(node)方法的實現:

 1 private Node enq(final Node node) {
 2         for (;;) {
 3             Node t = tail;
 4             if (t == null) { // Must initialize 隊尾是null,符合前面說的第一種情況
 5                 if (compareAndSetHead(new Node())) // 設定隊首
 6                     tail = head; // 隊首隊尾都初始化成空node
 7             } else { // 隊尾不為空,是前面說的第二種情況,此種情況的處理邏輯同上面對pred != null的處理
 8                 node.prev = t;
 9                 if (compareAndSetTail(t, node)) {
10                     t.next = node;
11                     return t;
12                 }
13             }
14         }
15     }

    可以看到此方法無限迴圈,直到執行完else中的邏輯。此處需要注意的一點是,如果剛開始時佇列是空的,即tail是null,會觸發隊首隊尾的初始化,初始化之後再一次迴圈會進入else中,將node放到原隊尾的後面,返回t。注意返回的t沒有用到,是在其他場景的方法中用的。

 方法3:acquireQueued(final Node node, int arg)

     該方法用於獲取鎖,返回值表示當前獲取到鎖的執行緒在獲取鎖的過程中是否中斷過,下面先看原始碼:

 1 final boolean acquireQueued(final Node node, int arg) {
 2         boolean failed = true;
 3         try {
 4             boolean interrupted = false;
 5             for (;;) {
 6                 final Node p = node.predecessor(); // 獲取當前節點的前一個節點
 7                 if (p == head && tryAcquire(arg)) { // 如果p==head說明node是第一個任務,那麼就可以通過tryAcquire去獲取鎖
 8                     setHead(node); // 獲取鎖成功,則將node放到隊首位置,並將thread和prev置為null
 9                     p.next = null; // help GC 再將p的next置為null,切斷與外界的一切聯絡
10                     failed = false;
11                     return interrupted;
12                 }// 下面if中的兩個方法很重要,著重講解
13                 if (shouldParkAfterFailedAcquire(p, node) &&
14                     parkAndCheckInterrupt())
15                     interrupted = true;
16             }
17         } finally {
18             if (failed)
19                 cancelAcquire(node);
20         }
21     }

    通過註解,相信對第一個if中的邏輯能理解清楚,下我們著重講解第二個if中的兩個方法。

    第一個是 shouldParkAfterFailedAcquire(p, node) 方法,此方法的邏輯為:

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2         int ws = pred.waitStatus; // 1、對於新建的Node節點,此狀態都為0(只有addConditionWaiter新建node節點時才不是0)
 3         if (ws == Node.SIGNAL)
 4             // 3、在2中將ws置為-1後,該方法返回false,外層for迴圈再走一圈,第二次進入此方法時會進入這裡,直接返回true。 -1的狀態表示可以將當前執行緒park
 5             return true;
 6         if (ws > 0) {
 7 
 8             do {
 9                 node.prev = pred = pred.prev;
10             } while (pred.waitStatus > 0);
11             pred.next = node;
12         } else {
13             // 2、是ws=0的話會進入這裡,將ws置為-1,0的狀態表示還不能park
14             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
15         }
16         return false;
17     }

    如果返回的是true,則進入第二個方法將當前執行緒暫停:

1 private final boolean parkAndCheckInterrupt() {
2         LockSupport.park(this);
3         return Thread.interrupted();
4     }

    當前面的執行緒執行完畢,喚醒這個執行緒的時候,就會從第三行開始繼續執行for迴圈中獲取鎖的邏輯,直到獲取鎖。

 

    到這裡,ReentrantLock的lock方法便結束了,整體流程就是這樣。看JUC包中的原始碼,可以看到寫的很簡潔,有時一兩個簡單的判斷條件卻代表了非常多的意思,充分顯示了程式設計者縝密又舉重若輕的實力,讀這樣的原始碼,有一種看本格推理小說般的思維上的愉悅感。

    下一節我們將介紹unlock方法的原理,與本節最後一個方法就能接上了,下期再會!

 

 

&n