1. 程式人生 > 其它 >011Java併發包012AQS

011Java併發包012AQS

本文主要了解了併發中的AQS抽象類。

注意:本文基於JDK1.8進行記錄。

1 簡介

1.1 是什麼

AQS是英文單詞AbstractQueuedSynchronizer的縮寫,翻譯過來就是抽象的佇列式的同步器,AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock、Semaphore、CountDownLatch等等。

AQS是用來構建鎖或者其它同步器元件的重量級基礎框架及整個JUC體系的基石,通過內建的FIFO佇列來完成資源獲取執行緒的排隊工作,並通過一個state整型變量表示持有鎖的狀態。

1.2 抽象

AQS的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態。

1.3 原理

搶到資源的執行緒直接使用處理業務邏輯,搶不到資源的必然涉及一種排隊等候機制。既然說到了排隊等候機制,那麼就一定會有某種佇列形成,這樣的佇列是什麼資料結構呢。

如果共享資源被佔用,就需要一定的阻塞等待喚醒機制來保證鎖分配。這個機制主要用的是CLH佇列的變體實現的,將暫時獲取不到鎖的執行緒加入到佇列中,這個佇列就是AQS的抽象表現。它將請求共享資源的執行緒封裝成佇列的Node結點,通過CAS、自旋以及LockSupport的憑證機制,維護state變數的狀態,使併發達到同步的控制效果。

CLH:Craig、Landin、Hagersten(三個科學家名字)佇列,原版是一個單向連結串列,AQS中的佇列是CLH變體的虛擬雙向佇列FIFO,其頭節點在初始化後變為空節點。

1.4 資源使用方式

AQS定義兩種資源使用方式:Exclusive(獨佔,只有一個執行緒能執行,如ReentrantLock)和Share(共享,多個執行緒可同時執行,如Semaphore/CountDownLatch)。

2 體系架構

 1 public abstract class AbstractQueuedSynchronizer
 2   extends AbstractOwnableSynchronizer
 3   implements java.io.Serializable {
 4   ...
 5   // 內部封裝Node節點
 6   static final class
Node { 7 // 標記執行緒以共享的模式等待鎖 8 static final Node SHARED = new Node(); 9 // 標記執行緒以獨佔的模式等待鎖 10 static final Node EXCLUSIVE = null; 11 // waitStatus取值為1表示執行緒取消(超時、中斷),被取消的節點不會阻塞 12 static final int CANCELLED = 1; 13 // waitStatus取值為-1表示後繼節點已經準備完成,等待執行緒釋放資源 14 static final int SIGNAL = -1; 15 // waitStatus取值為-2表示執行緒在Condition佇列中阻塞,當其他執行緒呼叫了Condition中的喚醒方法後,將節點從Condition佇列轉移到CLH等待佇列(Condition中有使用) 16 static final int CONDITION = -2; 17 // waitStatus取值為-3表示執行緒及後續執行緒無條件傳播(共享模式可用,CountDownLatch中有使用) 18 static final int PROPAGATE = -3; 19 // 執行緒的等待狀態,初始值為0 20 volatile int waitStatus; 21 // 前驅節點 22 volatile Node prev 23 // 後繼節點 24 volatile Node next; 25 // 執行緒物件 26 volatile Thread thread; 27 ... 28 } 29 // 頭節點 30 private transient volatile Node head 31 // 尾節點 32 private transient volatile Node tail; 33 // 資源狀態,0表示可獲取,大於等於1表示已佔用 34 private volatile int state; 35 // 獲取資源狀態 36 protected final int getState() { 37 return state; 38 } 39 // 設定資源狀態 40 protected final void setState(int newState) { 41 state = newState; 42 } 43 // CAS設定資源狀態 44 protected final boolean compareAndSetState(int expect, int update) { 45 // See below for intrinsics setup to support this 46 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 47 } 48 ... 49 }

AQS使用了一個volatile修飾的整型變數state用來表示同步狀態,通過內建的CLH同步佇列來完成執行緒的排隊工作。

其中,對state值的修改是通過CAS完成的,0表示資源可用,大於等於1表示資源不可用。AQS提供了三種操作state的方法:getState()、setState()、compareAndSetState()。

當前執行緒根據state的值判斷能否獲取資源,如果獲取失敗,AQS會將當前執行緒thread以及等待狀態waitStatus等資訊封裝成Node節點,並將其加CLH入同步佇列,同時阻塞當前執行緒。當state的值變為可獲取資源後,會把Node節點中的執行緒喚醒,再次嘗試獲取資源。

3 Lock與AQS

Lock介面的實現類,基本都是通過聚合了一個佇列同步器的子類完成執行緒訪問控制的。

 1 public class ReentrantLock implements Lock, java.io.Serializable {
 2   ...
 3   abstract static class Sync extends AbstractQueuedSynchronizer {
 4     ...
 5     final boolean nonfairTryAcquire(int acquires) {
 6       final Thread current = Thread.currentThread();
 7       int c = getState();
 8       if (c == 0) {
 9         if (compareAndSetState(0, acquires)) {
10           setExclusiveOwnerThread(current);
11           return true;
12         }
13       }
14       else if (current == getExclusiveOwnerThread()) {
15         int nextc = c + acquires;
16         if (nextc < 0) // overflow
17           throw new Error("Maximum lock count exceeded");
18         setState(nextc);
19         return true;
20       }
21       return false;
22     }
23     ...
24   }
25   static final class NonfairSync extends Sync {
26     ...
27     final void lock() {
28       if (compareAndSetState(0, 1))
29         setExclusiveOwnerThread(Thread.currentThread());
30       else
31         acquire(1);
32     }
33     protected final boolean tryAcquire(int acquires) {
34       return nonfairTryAcquire(acquires);
35     }
36   }
37   static final class FairSync extends Sync {
38     ...
39     final void lock() {
40       acquire(1);
41     }
42     protected final boolean tryAcquire(int acquires) {
43       final Thread current = Thread.currentThread();
44       int c = getState();
45       if (c == 0) {
46         if (!hasQueuedPredecessors() &&
47           compareAndSetState(0, acquires)) {
48           setExclusiveOwnerThread(current);
49           return true;
50         }
51       }
52       else if (current == getExclusiveOwnerThread()) {
53         int nextc = c + acquires;
54         if (nextc < 0)
55           throw new Error("Maximum lock count exceeded");
56         setState(nextc);
57         return true;
58       }
59       return false;
60     }
61   }
62   public ReentrantLock() {
63     sync = new NonfairSync();
64   }
65   public ReentrantLock(boolean fair) {
66     sync = fair ? new FairSync() : new NonfairSync();
67   }
68   ...
69 }

ReentrantLock類的內部聚合了一個Sync類,Sync類繼承了AQS類,並且非公平鎖NonfairSync和公平鎖FairSync都繼承自Sync,預設建立的是非公平鎖NonfairSync。

4 分析ReentrantLock

4.1 概述

整個ReentrantLock的加鎖過程,可以分為三個階段:

1)嘗試加鎖。

2)加鎖失敗,執行緒入佇列。

3)執行緒入佇列後,進入阻賽狀態。

4.2 場景舉例

舉例三個客戶在銀行辦理業務,使用預設的非公平鎖:

 1 public static void main(String[] args) {
 2   Lock lock = new ReentrantLock();
 3   new Thread(()->{
 4     lock.lock();
 5     try {
 6       System.out.println(Thread.currentThread().getName() + "-----辦理業務");
 7       try {
 8         TimeUnit.SECONDS.sleep(60);
 9       } catch (InterruptedException e) {
10         e.printStackTrace();
11       }
12       System.out.println(Thread.currentThread().getName() + "-----離開");
13     } finally {
14       lock.unlock();
15     }
16   }, "A").start();
17   new Thread(()->{
18     lock.lock();
19     try {
20       System.out.println(Thread.currentThread().getName() + "-----辦理業務");
21       try {
22         TimeUnit.SECONDS.sleep(60);
23       } catch (InterruptedException e) {
24         e.printStackTrace();
25       }
26       System.out.println(Thread.currentThread().getName() + "-----離開");
27     } finally {
28       lock.unlock();
29     }
30   }, "B").start();
31   new Thread(()->{
32     lock.lock();
33     try {
34       System.out.println(Thread.currentThread().getName() + "-----辦理業務");
35       try {
36         TimeUnit.SECONDS.sleep(60);
37       } catch (InterruptedException e) {
38         e.printStackTrace();
39       }
40       System.out.println(Thread.currentThread().getName() + "-----離開");
41     } finally {
42       lock.unlock();
43     }
44   }, "C").start();
45 }

5 程式分析

5.1 執行緒A開始並執行

5.1.1 獲取資源

執行緒A進入,呼叫lock()方法,檢視實現:

1 final void lock() {
2   // 使用CAS設定state為1
3   if (compareAndSetState(0, 1))
4     // 表示獲取資源成功,將當前執行緒設為佔用執行緒
5     setExclusiveOwnerThread(Thread.currentThread());
6   else
7     // 表示獲取資源失敗,繼續搶佔資源
8     acquire(1);
9 }

因為執行緒A是第一個獲取資源的執行緒,所以使用compareAndSetState()方法設定成功,繼續呼叫setExclusiveOwnerThread()方法將當前執行緒設為佔用執行緒,然後繼續執行業務。

5.2 執行緒B開始並阻塞

5.2.1 獲取資源

執行緒B進入,呼叫lock()方法。

因為執行緒B是第二個獲取資源的執行緒,執行緒A已經將state從0改為了1,所以使用compareAndSetState()方法設定失敗,繼續呼叫acquire()方法獲取資源,檢視實現:

1 public final void acquire(int arg) {
2   // 搶佔資源
3   if (!tryAcquire(arg) &&
4     // 加入等待佇列
5     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
6     // 執行緒阻塞
7     selfInterrupt();
8 }

如果搶佔資源成功,呼叫tryAcquire()方法返回true,判斷條件結束,繼續執行業務。

如果搶佔資源失敗,繼續判斷acquireQueued()方法返回。執行addWaiter()方法並傳入引數表示使用獨佔模式將執行緒加入到等待佇列。

5.2.2 搶佔資源

執行緒B進入,繼續呼叫acquire()方法獲取資源,執行tryAcquire()方法,檢視實現:

1 protected final boolean tryAcquire(int acquires) {
2   // 繼續呼叫非公平鎖的嘗試搶佔方法
3   return nonfairTryAcquire(acquires);
4 }

繼續呼叫非公平鎖的nonfairTryAcquire()方法,返回false表示佔用失敗:

 1 final boolean nonfairTryAcquire(int acquires) {
 2   // 記錄當前執行緒
 3   final Thread current = Thread.currentThread();
 4   // 記錄當前資源狀態
 5   int c = getState();
 6   // 0表示當前資源可用
 7   if (c == 0) {
 8     // 使用CAS設定state為請求數
 9     if (compareAndSetState(0, acquires)) {
10       // 表示獲取資源成功,將當前執行緒設為佔用執行緒
11       setExclusiveOwnerThread(current);
12       return true;
13     }
14   }
15   // 大於等於1表示當前資源被佔用,判斷當前執行緒是否為佔用執行緒(可重入鎖的情況)
16   else if (current == getExclusiveOwnerThread()) {
17     // 當前執行緒為佔用執行緒,記錄資源狀態
18     int nextc = c + acquires
19     // 判斷是否溢位
20     if (nextc < 0) // overflow
21       throw new Error("Maximum lock count exceeded");
22     // 設定state為新的資源狀態
23     setState(nextc);
24     return true;
25   }
26   return false;
27 }

5.2.3 進入等待

執行緒B進入,繼續呼叫addWaiter()方法將當前執行緒加入等待佇列,檢視實現:

 1 private Node addWaiter(Node mode) {
 2   // 將當前執行緒和傳入的獨佔模式封裝為節點
 3   Node node = new Node(Thread.currentThread(), mode);
 4   // Try the fast path of enq; backup to full enq on failure
 5   Node pred = tail;
 6   // 尾節點不為空,表示CLH佇列已經初始化,CAS操作將當前節點設為尾節點
 7   if (pred != null) {
 8     node.prev = pred;
 9     if (compareAndSetTail(pred, node)) {
10       pred.next = node;
11       return node;
12     }
13   }
14   // 尾節點為空,表示CLH佇列還未初始化,初始化佇列
15   enq(node);
16   return node;
17 }

因為執行緒B是第一個進入等待的執行緒,尾節點為空,繼續檢視enq()方法:

 1 private Node enq(final Node node) {
 2   for (;;) {
 3     Node t = tail;
 4     // 尾節點為空,通過CAS設定頭節點和尾節點為空節點
 5     if (t == null) { // Must initialize
 6       if (compareAndSetHead(new Node()))
 7         tail = head;
 8     } else {
 9       // 尾節點不為空,通過CAS將當前節點作為新的尾節點
10       node.prev = t;
11       if (compareAndSetTail(t, node)) {
12         t.next = node;
13         return t;
14       }
15     }
16   }
17 }

初始化CLH佇列後,頭節點為空節點,尾節點為當前節點。

5.2.4 阻塞執行緒

執行緒B得到當前節點後,作為引數傳入acquireQueued()方法繼續執行:

 1 final boolean acquireQueued(final Node node, int arg) {
 2   // 記錄當前節點是否取消,預設為true,表示取消
 3   boolean failed = true;
 4   try {
 5     // 標記當前節點是否中斷,預設為false,表示當前節點沒有中斷
 6     boolean interrupted = false
 7     // 自旋
 8     for (;;) {
 9       // 獲取當前節點的上一節點
10       final Node p = node.predecessor();
11       // 如果當前節點是頭節點,表示當前節點即將被喚醒,嘗試搶佔資源
12       if (p == head && tryAcquire(arg)) {
13         // 將當前節點設為頭節點,置空當前節點的上一節點,並取消當前節點同當前執行緒的繫結
14         setHead(node);
15         // 將原頭節點的下一節點置空,方便GC回收
16         p.next = null; // help GC
17         // 標記當前節點為false,表示沒有取消
18         failed = false;
19         // 返回false,表示當前節點沒有中斷
20         return interrupted;
21       }
22       // 不管當前節點是不是頭節點,執行到這裡就表示獲取資源失敗,處理前置節點並阻塞當前節點
23       if (shouldParkAfterFailedAcquire(p, node) &&
24         parkAndCheckInterrupt())
25         // 標記為true,表示當前節點中斷
26         interrupted = true;
27     }
28   } finally {
29     // 當前節點如果被取消,執行取消操作
30     if (failed)
31       cancelAcquire(node);
32   }
33 }

因為執行緒B是第一個進入等待的執行緒,上一節點為頭節點,嘗試獲取資源。獲取成功則將當前節點作為頭節點並移除當前執行緒,獲取失敗則進入判斷。

在判斷條件中呼叫shouldParkAfterFailedAcquire()方法,處理前置節點:

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2   // 記錄上一節點的等待狀態
 3   int ws = pred.waitStatus;
 4   if (ws == Node.SIGNAL)
 5     // 如果上一節點的等待狀態為-1,表示當前執行緒可以被阻塞,返回true,執行parkAndCheckInterrupt()方法
 6     return true;
 7   if (ws > 0) {
 8     // 如果上一節點的等待狀態為1,表示上一節點被取消,迴圈移除被取消的上一節點
 9     do {
10       node.prev = pred = pred.prev;
11     } while (pred.waitStatus > 0);
12     pred.next = node;
13   } else {
14     // 上述條件不滿足,表示上一節點的等待狀態為0或者-3,通過CAS將等待狀態設定為-1
15     compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
16   }
17   // 返回false,跳過parkAndCheckInterrupt()方法,重新進入自旋
18   return false;
19 }

因為執行緒B是第一個進入等待的執行緒,上一節點為頭節點,頭節點為空節點,等待狀態為0,所以兩次進入此方法。

第一次進入shouldParkAfterFailedAcquire()方法將上一節點的等待狀態設定為-1後返回false,條件判斷為false重新進入自旋。

第二次進入shouldParkAfterFailedAcquire()方法檢測到上一節點的等待狀態為-1,返回true,繼續判斷parkAndCheckInterrupt()方法。

在判斷條件中呼叫parkAndCheckInterrupt()方法,阻塞當前節點:

1 private final boolean parkAndCheckInterrupt() {
2   // 使用LockSupport的park()方法阻塞當前節點
3   LockSupport.park(this);
4   // 返回執行緒的中斷狀態
5   return Thread.interrupted();
6 }

執行緒B在此被阻塞。

5.3 執行緒C開始並阻塞

5.3.1 獲取資源

執行緒C進入,呼叫lock()方法。

因為執行緒C是第三個獲取資源的執行緒,執行緒A已經將state從0改為了1,所以使用compareAndSetState()方法設定失敗,繼續呼叫acquire()方法獲取資源。

如果搶佔資源成功,呼叫tryAcquire()方法返回true,判斷條件結束,繼續執行業務。

如果搶佔資源失敗,繼續判斷acquireQueued()方法返回。執行addWaiter()方法並傳入引數表示使用獨佔模式將執行緒加入到等待佇列。

5.3.2 搶佔資源

執行緒C進入,繼續呼叫acquire()方法獲取資源,執行tryAcquire()方法。

繼續呼叫非公平鎖的nonfairTryAcquire()方法,返回false表示佔用失敗。

5.3.3 進入等待

執行緒C進入,繼續呼叫addWaiter()方法將當前執行緒加入等待佇列。

因為執行緒C是第二個進入等待的執行緒,執行緒B已經完成了佇列初始化,尾節點不為空,將當前節點作為新的尾節點。

5.3.4 阻塞執行緒

執行緒C得到當前節點後,作為引數傳入acquireQueued()方法繼續執行。

因為執行緒C是第二個進入等待的執行緒,上一節點不為頭節點,直接進入判斷。

在判斷條件中呼叫shouldParkAfterFailedAcquire()方法,處理前置節點,將B節點的等待狀態設為-1,返回true,繼續判斷parkAndCheckInterrupt()方法。

在判斷條件中呼叫parkAndCheckInterrupt()方法,阻塞當前節點。

執行緒C在此被阻塞。

5.4 執行緒A結束

5.4.1 解鎖資源

執行緒A執行完畢,呼叫unlock()方法釋放資源並喚醒執行緒,檢視實現:

1 public void unlock() {
2   sync.release(1);
3 }

繼續檢視release()方法:

 1 public final boolean release(int arg) {
 2   // 呼叫tryRelease()方法嘗試釋放資源
 3   if (tryRelease(arg)) {
 4     // 獲取頭節點
 5     Node h = head;
 6     // 如果頭節點不為空,並且等待狀態不為0,表示需要喚醒其他執行緒
 7     if (h != null && h.waitStatus != 0)
 8       // 呼叫unparkSuccessor()方法並傳入頭節點,喚醒執行緒
 9       unparkSuccessor(h);
10     return true;
11   }
12   // 釋放失敗返回false
13   return false;
14 }

5.4.2 釋放資源

繼續檢視tryRelease()方法:

 1 protected final boolean tryRelease(int releases) {
 2   // 記錄資源狀態
 3   int c = getState() - releases;
 4   // 如果當前執行緒不為佔用執行緒則丟擲異常
 5   if (Thread.currentThread() != getExclusiveOwnerThread())
 6     throw new IllegalMonitorStateException();
 7   // 標記資源空閒,預設為false
 8   boolean free = false
 9   // 資源狀態為0則標記資源空閒為true,並將佔用執行緒置空
10   if (c == 0) {
11     free = true;
12     setExclusiveOwnerThread(null);
13   }
14   // 設定資源狀態
15   setState(c);
16   // 返回資源空閒
17   return free;
18 }

執行緒A釋放資源並返回true,繼續執行。

5.4.3 喚醒執行緒

因為執行緒B和執行緒C已經進入等待佇列,所以頭節點不為空,繼續檢視unparkSuccessor()方法:

 1 private void unparkSuccessor(Node node) {
 2   // 記錄頭節點的等待狀態
 3   int ws = node.waitStatus;
 4   // 如果頭節點的等待狀態小於0,則將頭節點的等待狀態設為0
 5   if (ws < 0)
 6     compareAndSetWaitStatus(node, ws, 0);
 7   // 記錄頭節點的下一節點
 8   Node s = node.next;
 9   // 判斷下一節點是否為空或者下一節點的等待狀態是否大於0
10   if (s == null || s.waitStatus > 0) {
11     s = null;
12     // 遍歷下一節點,找到不為空並且等待狀態小於等於0的節點,將其設為下一節點
13     for (Node t = tail; t != null && t != node; t = t.prev)
14       if (t.waitStatus <= 0)
15         s = t;
16   }
17   // 如果下一節點不為空,則使用LockSupport的unpark()方法喚醒下一節點中的執行緒
18   if (s != null)
19     LockSupport.unpark(s.thread);
20 }

頭節點的下一節點為執行緒B所在的節點,執行緒B被喚醒。

5.5 執行緒B執行並結束

5.5.1 搶佔資源

執行緒B在parkAndCheckInterrupt()方法中被釋放後,返回中斷狀態為false,重新進入自旋:

 1 final boolean acquireQueued(final Node node, int arg) {
 2   // 記錄當前節點是否取消,預設為true,表示取消
 3   boolean failed = true;
 4   try {
 5     // 標記當前節點是否中斷,預設為false,表示當前節點沒有中斷
 6     boolean interrupted = false
 7     // 自旋
 8     for (;;) {
 9       // 獲取當前節點的上一節點
10       final Node p = node.predecessor();
11       // 如果當前節點是頭節點,表示當前節點即將被喚醒,嘗試搶佔資源
12       if (p == head && tryAcquire(arg)) {
13         // 將當前節點設為頭節點,置空當前節點的上一節點,並取消當前節點同當前執行緒的繫結
14         setHead(node);
15         // 將原頭節點的下一節點置空,方便GC回收
16         p.next = null; // help GC
17         // 標記當前節點為false,表示沒有取消
18         failed = false;
19         // 返回false,表示當前節點沒有中斷
20         return interrupted;
21       }
22       // 不管當前節點是不是頭節點,執行到這裡就表示獲取資源失敗,處理前置節點並阻塞當前節點
23       if (shouldParkAfterFailedAcquire(p, node) &&
24         parkAndCheckInterrupt())
25         // 標記為true,表示當前節點中斷
26         interrupted = true;
27     }
28   } finally {
29     // 當前節點如果被取消,執行取消操作
30     if (failed)
31       cancelAcquire(node);
32   }
33 }

因為執行緒B的上一節點為頭節點,進入tryAcquire()方法搶佔資源,搶佔成功返回true並將當前節點設為頭節點,同時解除同線程B的繫結。

5.5.2 解鎖資源

執行緒B執行完畢,呼叫unlock()方法釋放資源並喚醒執行緒。

頭節點的下一節點為執行緒C所在的節點,執行緒C被喚醒。

5.6 執行緒C執行並結束

5.6.1 搶佔資源

執行緒C在parkAndCheckInterrupt()方法中被釋放後,返回中斷狀態為false,重新進入自旋。

因為執行緒C的上一節點為頭節點,進入tryAcquire()方法搶佔資源,搶佔成功返回true並將當前節點設為頭節點,同時解除同線程C的繫結。

5.6.2 解鎖資源

執行緒C執行完畢,呼叫unlock()方法釋放資源並喚醒執行緒。

頭節點的下一節點為空,不會有任何執行緒被喚醒。

6 公平鎖與非公平鎖

6.1 非公平鎖

非公平鎖的執行緒在獲取資源時,會嘗試獲取資源,如果成功則立刻佔用資源,如果失敗則嘗試佔用資源。

在資源可用時不會判斷當前佇列是否有執行緒在等待,也就是說剛加入的執行緒可以同喚醒的執行緒競爭資源。

 1 final void lock() {
 2   if (compareAndSetState(0, 1))
 3     setExclusiveOwnerThread(Thread.currentThread());
 4   else
 5     acquire(1);
 6 }
 7 ...
 8 public final void acquire(int arg) {
 9   if (!tryAcquire(arg) &&
10     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
11     selfInterrupt();
12 }
13 ...
14 protected final boolean tryAcquire(int acquires) {
15   return nonfairTryAcquire(acquires);
16 }
17 ...
18 final boolean nonfairTryAcquire(int acquires) {
19   final Thread current = Thread.currentThread();
20   int c = getState();
21   if (c == 0) {
22     if (compareAndSetState(0, acquires)) {
23       setExclusiveOwnerThread(current);
24       return true;
25     }
26   }
27   else if (current == getExclusiveOwnerThread()) {
28     int nextc = c + acquires;
29     if (nextc < 0) // overflow
30       throw new Error("Maximum lock count exceeded");
31     setState(nextc);
32     return true;
33   }
34   return false;
35 }

6.2 公平鎖

公平鎖的執行緒在獲取資源時,不會嘗試獲取資源,而是嘗試佔用資源。

在資源可用時會判斷當前佇列是否有執行緒在等待,剛加入的執行緒不可用同喚醒的執行緒競爭資源。

 1 final void lock() {
 2   acquire(1);
 3 }
 4 ...
 5 public final void acquire(int arg) {
 6   if (!tryAcquire(arg) &&
 7     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 8     selfInterrupt();
 9 }
10 ...
11 protected final boolean tryAcquire(int acquires) {
12   final Thread current = Thread.currentThread();
13   int c = getState();
14   if (c == 0) {
15     if (!hasQueuedPredecessors() &&
16       compareAndSetState(0, acquires)) {
17       setExclusiveOwnerThread(current);
18       return true;
19     }
20   }
21   else if (current == getExclusiveOwnerThread()) {
22     int nextc = c + acquires;
23     if (nextc < 0)
24       throw new Error("Maximum lock count exceeded");
25     setState(nextc);
26     return true;
27   }
28   return false;
29 }

6.3 分析hasQueuedPredecessors()方法

比較NonfairSync和FairSync中tryAcquire()方法的實現,發現FairSync中的tryAcquire()方法中多了一項判斷:

1 !hasQueuedPredecessors()

hasQueuedPredecessors()方法用於公平鎖加鎖時判斷等待佇列中是否存在有效節點的方法。該方法用於判斷當前節點前是否有其他節點排隊:

返回false表示沒有,取反後為true表示當前節點不需要排隊,需要繼續執行佔用資源的操作。

返回true表示有,取反後為false表示當前節點需要排隊,需要執行加入等待佇列的操作。

檢視AQS中定義的hasQueuedPredecessors()方法:

 1 public final boolean hasQueuedPredecessors() {
 2   // The correctness of this depends on head being initialized
 3   // before tail and on head.next being accurate if the current
 4   // thread is first in queue.
 5   Node t = tail; // Read fields in reverse initialization order
 6   Node h = head;
 7   Node s;
 8   return h != t &&
 9     ((s = h.next) == null || s.thread != Thread.currentThread());
10 }

判斷h是否不等於t,如果不成立,說明h等於t,說明頭節點和尾節點相同,說明當前佇列未初始化(頭節點和尾節點都是空節點)或者當前佇列只有一個節點(頭結點和尾節點都是空節點),說明不需要排隊,返回false,取反後為true,嘗試佔用資源。

判斷h是否不等於t,如果成立,說明h不等於t,說明存在兩個不同節點。繼續判斷頭節點的下一節點是否為空節點,如果成立,說明下一節點為空,可能上個執行緒在執行初始化enq()方法,剛剛通過CAS操作compareAndSetHead()將頭節點初始化,尚未給尾節點賦值,此時頭節點不為空,尾節點為空,並且頭節點的下一節點為空,返回true,取反後為false,需要排隊。

判斷h是否不等於t,如果成立,說明h不等於t,說明存在兩個不同節點。繼續判斷頭節點的下一節點是否為空節點,如果不成立,說明下一節點不為空。繼續判斷下一節點封裝的執行緒是否不等於當前執行緒,如果成立,說明下一執行緒不為當前執行緒,返回true,取反後為false,需要排隊。

判斷h是否不等於t,如果成立,說明h不等於t,說明存在兩個不同節點。繼續判斷頭節點的下一節點是否為空節點,如果不成立,說明下一節點不為空。繼續判斷下一節點封裝的執行緒是否不等於當前執行緒,如果不成立,說明下一執行緒為當前執行緒,返回false,取反後為true,嘗試佔用資源。

7 自定義同步器

7.1 實現方法

不同的自定義同步器爭用共享資源的方式也不同,自定義同步器在實現時只需要實現共享資源state的獲取與釋放即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊和喚醒出隊等),AQS已經在底層實現好了。

自定義同步器實現時主要實現以下幾種方法:

isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。

tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。

tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。

tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。

tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。

一般來說,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支援自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。

7.2 舉例說明

7.2.1 ReentrantLock

以ReentrantLock為例,state初始化為0,表示未鎖定狀態。

當執行緒A呼叫lock()方法獲取資源時,會呼叫tryAcquire()佔用資源,並將state的值加1。

此後,其他執行緒再tryAcquire()時就會失敗,直到執行緒A呼叫unlock()方法釋放資源,並將state的值減0,其它執行緒才有機會獲取該鎖。

當然,釋放鎖之前,A執行緒自己是可以重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多少次,這樣才能保證state是能回到零態的。

7.2.2 CountDownLatch

再以CountDownLatch以例,任務分為N個子執行緒去執行,state也初始化為N(N與執行緒個數一致)。這N個子執行緒是並行執行的,每個子執行緒執行完後都會呼叫一次countDown()方法,使用CAS操作將state值減1。等到所有子執行緒都執行完後,state的值變為0,這時會呼叫unpark()方法喚醒主執行緒,然後主執行緒就會從await()方法喚醒,繼續後餘動作。