硬核乾貨:5W字17張高清圖理解同步器框架AbstractQueuedSynchronizer
阿新 • • 發佈:2020-07-24
## 前提
併發程式設計大師**Doug Lea**在編寫`JUC`(`java.util.concurrent`)包的時候引入了`java.util.concurrent.locks.AbstractQueuedSynchronizer`,其實是`Abstract Queued Synchronizer`,也就是"基於佇列實現的抽象同步器",一般我們稱之為`AQS`。其實`Doug Lea`大神編寫`AQS`是有嚴謹的理論基礎的,他的個人部落格上有一篇論文[《The java.util.concurrent Synchronizer Framewor》](http://gee.cs.oswego.edu/dl/papers/aqs.pdf),可以在網際網路找到相應的譯文《JUC同步器框架》,如果想要深入研究`AQS`必須要理解一下該論文的內容,然後結合論文內容詳細分析一下`AQS`的原始碼實現。本文在閱讀`AQS`原始碼的時候選用的`JDK`版本是`JDK11`。
> 出於寫作習慣,下文會把AbstractQueuedSynchronizer稱為AQS、JUC同步器框或者同步器框架。
## AQS的主要功能
`AQS`是`JUC`包中用於構建鎖或者其他同步元件(訊號量、事件等)的基礎框架類。`AQS`從它的實現上看主要提供了下面的功能:
- 同步狀態的原子性管理。
- 執行緒的阻塞和解除阻塞。
- 提供阻塞執行緒的儲存佇列。
基於這三大功能,衍生出下面的附加功能:
- 通過中斷實現的任務取消,此功能基於執行緒中斷實現。
- 可選的超時設定,也就是呼叫者可以選擇放棄等待任務執行完畢直接返回。
- 定義了`Condition介面`,用於支援管程形式的`await/signal/signalAll`操作,代替了`Object`類基於`JNI`提供的`wait/notify/notifyAll`。
`AQS`還根據同步狀態的不同管理方式區分為兩種不同的實現:**獨佔狀態的同步器**和**共享狀態的同步器**。
## 同步器框架基本原理
[《The java.util.concurrent Synchronizer Framework》](http://gee.cs.oswego.edu/dl/papers/aqs.pdf)一文中其實有提及到同步器框架的虛擬碼:
```java
// acquire操作如下:
while (synchronization state does not allow acquire) {
enqueue current thread if not already queued;
possibly block current thread;
}
dequeue current thread if it was queued;
//release操作如下:
update synchronization state;
if (state may permit a blocked thread to acquire){
unblock one or more queued threads;
}
```
撇腳翻譯一下:
```java
// acquire操作如下:
while(同步狀態申請獲取失敗){
if(當前執行緒未進入等待佇列){
當前執行緒放入等待佇列;
}
嘗試阻塞當前執行緒;
}
當前執行緒移出等待佇列
//release操作如下:
更新同步狀態
if(同步狀態足夠允許一個阻塞的執行緒申請獲取){
解除一個或者多個等待佇列中的執行緒的阻塞狀態;
}
```
為了實現上述操作,需要下面三個基本環節的相互協作:
- 同步狀態的原子性管理。
- 等待佇列的管理。
- 執行緒的阻塞與解除阻塞。
其實基本原理很簡單,但是為了應對複雜的併發場景和併發場景下程式執行的正確性,同步器框架在上面的`acquire`操作和`release`操作中使用了**大量的死迴圈和`CAS`等操作**,再加上`Doug Lea`喜歡使用單行復雜的條件判斷程式碼,如**一個`if`條件語句會包含大量操作**,`AQS`很多時候會讓人感覺實現邏輯過於複雜。
### 同步狀態管理
`AQS`內部內部定義了一個`32`位整型的`state`變數用於儲存同步狀態:
```java
/**
* The synchronization state.(同步狀態值)
*/
private volatile int state;
// 獲取state
protected final int getState() {
return state;
}
// 直接覆蓋設定state
protected final void setState(int newState) {
state = newState;
}
// CAS設定state
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
```
同步狀態`state`在不同的實現中可以有不同的作用或者表示意義,這裡其實不能單純把它理解為中文意義上的"狀態",它可以代表資源數、鎖狀態等等,下文遇到具體的場景我們再分析它表示的意義。
### CLH佇列與變體
`CLH`鎖即`Craig, Landin, and Hagersten (CLH) locks`,因為它底層是基於佇列實現,一般也稱為`CLH`佇列鎖。`CLH`鎖也是一種基於連結串列的可擴充套件、高效能、公平的自旋鎖,申請執行緒僅僅在本地變數上自旋,它不斷輪詢前驅的狀態,假設發現前驅釋放了鎖就結束自旋。從實現上看,`CLH`鎖是一種自旋鎖,能確保無飢餓性,提供先來先服務的公平性。先看簡單的`CLH`鎖的一個簡單實現:
```java
public class CLHLock implements Lock {
AtomicReference tail = new AtomicReference<>(new QueueNode());
ThreadLocal pred;
ThreadLocal current;
public CLHLock() {
current = ThreadLocal.withInitial(QueueNode::new);
pred = ThreadLocal.withInitial(() -> null);
}
@Override
public void lock() {
QueueNode node = current.get();
node.locked = true;
QueueNode pred = tail.getAndSet(node);
this.pred.set(pred);
while (pred.locked) {
}
}
@Override
public void unlock() {
QueueNode node = current.get();
node.locked = false;
current.set(this.pred.get());
}
static class QueueNode {
boolean locked;
}
// 忽略其他介面方法的實現
}
```
上面是一個簡單的`CLH`佇列鎖的實現,內部類`QueueNode`只使用了一個簡單的布林值`locked`屬性記錄了每個執行緒的狀態,如果該屬性為`true`,則相應的執行緒要麼已經獲取到鎖,要麼正在等待鎖,如果該屬性為`false`,則相應的執行緒已經釋放了鎖。新來的想要獲取鎖的執行緒必須對`tail`屬性呼叫`getAndSet()`方法,使得自身成為佇列的尾部,同時得到一個指向前驅節點的引用`pred`,最後執行緒所在節點在其前驅節點的`locked`屬性上自旋,直到前驅節點釋放鎖。上面的實現是無法執行的,因為一旦自旋就會進入死迴圈導致`CPU`飆升,可以嘗試使用下文將要提到的`LockSupport`進行改造。
`CLH`佇列鎖本質是使用佇列(實際上是單向連結串列)存放等待獲取鎖的執行緒,等待的執行緒總是在其所在節點的前驅節點的狀態上自旋,直到前驅節點釋放資源。**從實際來看,過度自旋帶來的CPU效能損耗比較大,並不是理想的執行緒等待佇列的實現**。
![j-a-q-s-ex-17](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-17.png)
基於原始的`CLH`佇列鎖中提供的等待佇列的基本原理,**`AQS`實現一種了CLH鎖佇列的變體(Variant)**。`AQS`類的`protected`修飾的建構函式裡面有一大段註釋用於說明`AQS`實現的等待佇列的細節事項,這裡列舉幾點重要的:
- `AQS`實現的等待佇列沒有直接使用`CLH`鎖佇列,但是參考了其設計思路,等待節點會儲存前驅節點中執行緒的資訊,內部也會維護一個控制執行緒阻塞的狀態值。
- 每個節點都設計為一個持有單獨的等待執行緒並且"帶有具體的通知方式"的監視器,這裡所謂通知方式就是自定義喚醒阻塞執行緒的方式而已。
- 一個執行緒是等待佇列中的第一個等待節點的持有執行緒會嘗試獲取鎖,但是並不意味著它一定能夠獲取鎖成功(這裡的意思是存在公平和非公平的實現),獲取失敗就要重新等待。
- 等待佇列中的節點通過`prev`屬性連線前驅節點,通過`next`屬性連線後繼節點,簡單來說,就是**雙向連結串列的設計**。
- `CLH`佇列本應該需要一個虛擬的頭節點,但是在`AQS`中沒有直接提供虛擬的頭節點,而是延遲到第一次競爭出現的時候懶建立虛擬的頭節點(其實也會建立尾節點,初始化時頭尾節點是同一個節點)。
- `Condition`(條件)等待佇列中的阻塞執行緒使用的是相同的`Node`結構,但是提供了另一個連結串列用來存放,`Condition`等待佇列的實現比非`Condition`等待佇列複雜。
### 執行緒阻塞與喚醒
執行緒的阻塞和喚醒在`JDK1.5`之前,一般只能依賴於`Object`類提供的`wait()`、`notify()`和`notifyAll()`方法,它們都是`JNI`方法,由`JVM`提供實現,並且它們必須執行在獲取監視器鎖的程式碼塊內(`synchronized`程式碼塊中),這個侷限性先不談效能上的問題,程式碼的簡潔性和靈活性是比較低的。`JDK1.5`引入了`LockSupport`類,底層是基於`Unsafe`類的`park()`和`unpark()`方法,提供了執行緒阻塞和喚醒的功能,它的機制有點像只有一個允許使用資源的訊號量`java.util.concurrent.Semaphore`,也就是一個執行緒只能通過`park()`方法阻塞一次,只能呼叫`unpark()`方法解除呼叫阻塞一次,執行緒就會喚醒(多次呼叫`unpark()`方法也只會喚醒一次),可以想象是內部維護了一個0-1的計數器。
`LockSupport`類如果使用得好,可以提供更靈活的編碼方式,這裡舉個簡單的使用例子:
```java
public class LockSupportMain implements Runnable {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
private Thread thread;
private void setThread(Thread thread) {
this.thread = thread;
}
public static void main(String[] args) throws Exception {
LockSupportMain main = new LockSupportMain();
Thread thread = new Thread(main, "LockSupportMain");
main.setThread(thread);
thread.start();
Thread.sleep(2000);
main.unpark();
Thread.sleep(2000);
}
@Override
public void run() {
System.out.println(String.format("%s-步入run方法,執行緒名稱:%s", FORMATTER.format(LocalDateTime.now()),
Thread.currentThread().getName()));
LockSupport.park();
System.out.println(String.format("%s-解除阻塞,執行緒繼續執行,執行緒名稱:%s", FORMATTER.format(LocalDateTime.now()),
Thread.currentThread().getName()));
}
private void unpark() {
LockSupport.unpark(thread);
}
}
// 某個時刻的執行結果如下:
2019-02-25 00:39:57.780-步入run方法,執行緒名稱:LockSupportMain
2019-02-25 00:39:59.767-解除阻塞,執行緒繼續執行,執行緒名稱:LockSupportMain
```
`LockSupport`類`park()`方法也有帶超時的變體版本方法,遇到帶超時期限阻塞等待場景下不妨可以使用`LockSupport#parkNanos()`。
## 獨佔執行緒的儲存
`AbstractOwnableSynchronizer`是`AQS`的父類,一個同步器框架有可能在一個時刻被某一個執行緒獨佔,`AbstractOwnableSynchronizer`就是為所有的同步器實現和鎖相關實現提供了基礎的儲存、獲取和設定獨佔執行緒的功能,這個類的原始碼很簡單:
```java
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
// 當前獨佔執行緒的瞬時例項 - 提供Getter和Setter方法
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
```
它就提供了一個儲存獨佔執行緒的變數對應的`Setter`和`Getter`方法,方法都是`final`修飾的,子類只能使用不能覆蓋。
## CLH佇列變體的實現
這裡先重點分析一下`AQS`中等待佇列的節點`AQS`的靜態內部類`Node`的原始碼:
```java
static final class Node {
// 標記一個節點處於共享模式下的等待
static final Node SHARED = new Node();
// 標記一個節點處於獨佔模式下的等待
static final Node EXCLUSIVE = null;
// 取消狀態
static final int CANCELLED = 1;
// 喚醒狀態
static final int SIGNAL = -1;
// 條件等待狀態
static final int CONDITION = -2;
// 傳播狀態
static final int PROPAGATE = -3;
// 等待狀態,初始值為0,其他可選值是上面的4個值
volatile int waitStatus;
// 當前節點前驅節點的引用
volatile Node prev;
// 當前節點後繼節點的引用
volatile Node next;
// 當前節點持有的執行緒,可能是阻塞中等待喚醒的執行緒
volatile Thread thread;
// 下一個等待節點
Node nextWaiter;
// 當前操作的節點是否處於共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取當前節點的前驅節點,確保前驅節點必須存在,否則丟擲NPE
final Node predecessor() {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 空節點,主要是首次建立佇列的時候建立的頭和尾節點使用
Node() {}
// 設定下一個等待節點,設定持有執行緒為當前執行緒
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}
// 設定waitStatus,設定持有執行緒為當前執行緒
Node(int waitStatus) {
WAITSTATUS.set(this, waitStatus);
THREAD.set(this, Thread.currentThread());
}
// CAS更新waitStatus
final boolean compareAndSetWaitStatus(int expect, int update) {
return WAITSTATUS.compareAndSet(this, expect, update);
}
// CAS設定後繼節點
final boolean compareAndSetNext(Node expect, Node update) {
return NEXT.compareAndSet(this, expect, update);
}
// 設定前驅節點
final void setPrevRelaxed(Node p) {
PREV.set(this, p);
}
// 下面是變數控制代碼的實現,在VarHandle出現之前使用的是Unsafe,其實底層還是照樣使用Unsafe
private static final VarHandle NEXT;
private static final VarHandle PREV;
private static final VarHandle THREAD;
private static final VarHandle WAITSTATUS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Node.class, "next", Node.class);
PREV = l.findVarHandle(Node.class, "prev", Node.class);
THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
```
其中,變數控制代碼(`VarHandle`)是`JDK9`引入的新特性,其實底層依賴的還是`Unsafe`的方法,筆者認為可以簡單理解它為`Unsafe`的門面類,而定義的方法基本都是面向變數屬性的操作。這裡需要關注一下`Node`裡面的幾個屬性:
- `waitStatus`:當前`Node`例項的等待狀態,可選值有5個。
1. 初始值整數0:當前節點如果不指定初始化狀態值,預設值就是0,側面說明節點正在等待佇列中處於等待狀態。
2. `Node#CANCELLED`整數值1:表示當前節點例項因為超時或者執行緒中斷而被取消,等待中的節點永遠不會處於此狀態,被取消的節點中的執行緒例項不會阻塞。
3. `Node#SIGNAL`整數值-1:表示當前節點的後繼節點是(或即將是)阻塞的(通過`LockSupport#park()`),當它釋放或取消時,當前節點必須`LockSupport#unpark()`它的後繼節點。
4. `Node#CONDITION`整數值-2:表示當前節點是條件佇列中的一個節點,當它轉換為同步佇列中的節點的時候,狀態會被重新設定為0。
5. `Node#PROPAGATE`整數值-3:此狀態值通常只設置到呼叫了`doReleaseShared()`方法的頭節點,確保`releaseShared()`方法的呼叫可以傳播到其他的所有節點,簡單理解就是共享模式下節點釋放的傳遞標記。
- `prev`、`next`:當前`Node`例項的前驅節點引用和後繼節點引用。
- `thread`:當前`Node`例項持有的執行緒例項引用。
- `nextWaiter`:這個值是一個比較容易令人生疑的值,雖然表面上它稱為"下一個等待的節點",但是實際上它有三種取值的情況。
1. 值為靜態例項`Node.EXCLUSIVE`(也就是null),代表當前的`Node`例項是獨佔模式。
2. 值為靜態例項`Node.SHARED`,代表當前的`Node`例項是共享模式。
3. 值為非`Node.EXCLUSIVE`和`Node.SHARED`的其他節點例項,**代表Condition等待佇列中當前節點的下一個等待節點**。
`Node`類的等待狀態`waitStatus`理解起來是十分費勁的,下面分析`AQS`其他原始碼段的時候會**標識此狀態變化的時機**。
其實上面的`Node`類可以直接拷貝出來當成一個新建的類,然後嘗試構建一個雙向連結串列自行除錯,這樣子就能深刻它的資料結構。例如:
```java
public class AqsNode {
static final AqsNode SHARED = new AqsNode();
static final AqsNode EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile AqsNode prev;
volatile AqsNode next;
volatile Thread thread;
AqsNode nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final AqsNode predecessor() {
AqsNode p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
AqsNode() {
}
AqsNode(AqsNode nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}
AqsNode(int waitStatus) {
WAITSTATUS.set(this, waitStatus);
THREAD.set(this, Thread.currentThread());
}
final boolean compareAndSetWaitStatus(int expect, int update) {
return WAITSTATUS.compareAndSet(this, expect, update);
}
final boolean compareAndSetNext(AqsNode expect, AqsNode update) {
return NEXT.compareAndSet(this, expect, update);
}
final void setPrevRelaxed(AqsNode p) {
PREV.set(this, p);
}
private static final VarHandle NEXT;
private static final VarHandle PREV;
private static final VarHandle THREAD;
private static final VarHandle WAITSTATUS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(AqsNode.class, "next", AqsNode.class);
PREV = l.findVarHandle(AqsNode.class, "prev", AqsNode.class);
THREAD = l.findVarHandle(AqsNode.class, "thread", Thread.class);
WAITSTATUS = l.findVarHandle(AqsNode.class, "waitStatus", int.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
public static void main(String[] args) throws Exception {
AqsNode head = new AqsNode();
AqsNode next = new AqsNode(AqsNode.EXCLUSIVE);
head.next = next;
next.prev = head;
AqsNode tail = new AqsNode(AqsNode.EXCLUSIVE);
next.next = tail;
tail.prev = next;
List threads = new ArrayList<>();
for (AqsNode node = head; node != null; node = node.next) {
threads.add(node.thread);
}
System.out.println(threads);
}
}
// 某次執行的輸出:
[null, Thread[main,5,main], Thread[main,5,main]]
```
實際上,`AQS`中一共存在兩種等待佇列,其中一種是普通的同步等待佇列,這裡命名為`Sync Queue`,另一種是基於`Sync Queue`實現的條件等待佇列,這裡命名為`Condition Queue`。
### 理解同步等待佇列
前面已經介紹完`AQS`的同步等待佇列節點類,下面重點分析一下同步等待佇列的相關原始碼,**下文的Sync佇列、Sync Queue、同步佇列和同步等待佇列是同一個東西**。首先,我們通過分析`Node`節點得知`Sync`佇列一定是雙向連結串列,`AQS`中有兩個瞬時成員變數用來存放頭節點和尾節點:
```java
// 頭節點引用(注意由transient volatile修飾,不會序列化,並且寫操作會馬上重新整理到主記憶體)
private transient volatile Node head;
// 尾節點引用(注意由transient volatile修飾,不會序列化,並且寫操作會馬上重新整理到主記憶體)
private transient volatile Node tail;
// 變數控制代碼相關,用於CAS操作頭尾節點
private static final VarHandle STATE;
private static final VarHandle HEAD;
private static final VarHandle TAIL;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
// 確保LockSupport類已經初始化 - 這裡應該是為了修復之前一個因為LockSupport未初始化導致的BUG
Class> ensureLoaded = LockSupport.class;
}
// 初始化同步佇列,注意初始化同步佇列的時候,頭尾節點都是指向同一個新的Node例項
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}
// CAS設定同步佇列的尾節點
private final boolean compareAndSetTail(Node expect, Node update) {
return TAIL.compareAndSet(this, expect, update);
}
// 設定頭節點,重點注意這裡:傳入的節點設定成頭節點之後,前驅節點和持有的執行緒會置為null,這是因為:
// 1.頭節點一定沒有前驅節點。
// 2.當節點被設定為頭節點,它所在的執行緒一定是已經解除了阻塞。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
```
當前執行緒加入同步等待佇列和同步等待佇列的初始化是同一個方法,前文提到過:同步等待佇列的初始化會延遲到第一次可能出現競爭的情況,這是為了避免無謂的資源浪費,具體方法是`addWaiter(Node mode)`:
```java
// 新增等待節點到同步等待佇列,實際上初始化佇列也是這個方法完成的
private Node addWaiter(Node mode) {
// 基於當前執行緒建立一個新節點,節點的模式由呼叫者決定
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
// 尾節點不為空說明佇列已經初始化過,則把新節點加入到連結串列中,作為新的尾節點,建立和前驅節點的關聯關係
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
// 尾節點為空說明佇列尚未初始化過,進行一次初始化操作
initializeSyncQueue();
}
}
}
```
在首次呼叫`addWaiter()`方法,死迴圈至少執行兩輪再跳出,因為同步佇列必須初始化完成後(第一輪迴圈),然後再把當前執行緒所在的新節點例項新增到等待佇列中再返(第二輪迴圈)當前的節點,**這裡需要注意的是新加入同步等待佇列的節點一定是新增到佇列的尾部並且會更新`AQS`中的tail屬性為最新入隊的節點例項**。
假設我們使用`Node.EXCLUSIVE`模式把新增的等待執行緒加入佇列,例如有三個執行緒分別是`thread-1`、`thread-2`和`thread-3`,執行緒入隊的時候都處於阻塞狀態,模擬一下依次呼叫上面的入隊方法的同步佇列的整個連結串列的狀態。
先是執行緒`thread-1`加入等待佇列:
![j-a-q-s-ex-1](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-1.png)
接著是執行緒`thread-2`加入等待佇列:
![j-a-q-s-ex-2](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-2.png)
最後是執行緒`thread-3`加入等待佇列:
![j-a-q-s-ex-3](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-3.png)
如果仔細研究會發現,如果所有的入隊執行緒都處於阻塞狀態的話,新入隊的執行緒總是新增到佇列的`tail`節點,**阻塞的執行緒總是"爭搶"著成為`head`節點,這一點和`CLH`佇列鎖的阻塞執行緒總是基於前驅節點自旋以獲取鎖的思路是一致的**。下面將會分析的**獨佔模式與共享模式,執行緒加入等待佇列都是通過`addWaiter()`方法**。
### 理解條件等待佇列
前面已經相對詳細地介紹過同步等待佇列,在`AQS`中還存在另外一種相對特殊和複雜的等待佇列-**條件等待佇列**。介紹條件等待佇列之前,要先介紹`java.util.concurrent.locks.Condition`介面。
```java
public interface Condition {
// 當前執行緒進入等待狀態直到被喚醒或者中斷
void await() throws InterruptedException;
// 當前執行緒進入等待狀態,不響應中斷,阻塞直到被喚醒
void awaitUninterruptibly();
// 當前執行緒進入等待狀態直到被喚醒或者中斷,阻塞帶時間限制
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 當前執行緒進入等待狀態直到被喚醒或者中斷,阻塞帶時間限制
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 當前執行緒進入等待狀態直到被喚醒或者中斷,阻塞帶時間限制
boolean awaitUntil(Date deadline) throws InterruptedException;
// 喚醒單個阻塞執行緒
void signal();
// 喚醒所有阻塞執行緒
void signalAll();
}
```
`Condition`可以理解為`Object`中的`wait()`、`notify()`和`notifyAll()`的替代品,因為`Object`中的相應方法是`JNI`(`Native`)方法,由`JVM`實現,對使用者而言並不是十分友好(有可能伴隨`JVM`版本變更而受到影響),而`Condition`是基於資料結構和相應演算法實現對應的功能,我們可以從原始碼上分析其實現。
`Condition`的實現類是`AQS`的公有內部類`ConditionObject`。`ConditionObject`提供的入佇列方法如下:
```java
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */ - 條件佇列的第一個節點
private transient Node firstWaiter;
/** Last node of condition queue. */ - 條件佇列的最後一個節點
private transient Node lastWaiter;
// 公有建構函式
public ConditionObject() { }
// 新增條件等待節點
private Node addConditionWaiter() {
// 這裡做一次判斷,當前執行緒必須步入此同步器例項
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 臨時節點t賦值為lastWaiter引用
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 最後一個節點不為條件等待狀態,則是取消狀態
if (t != null && t.waitStatus != Node.CONDITION) {
// 解除所有取消等待的節點的連線
unlinkCancelledWaiters();
t = lastWaiter;
}
// 基於當前執行緒新建立一個條件等待型別的節點
Node node = new Node(Node.CONDITION);
// 首次建立Condition的時候,最後一個節點臨時引用t為null,則把第一個節點置為新建的節點
if (t == null)
firstWaiter = node;
else
// 已經存在第一個節點,則通過nextWaiter連線新的節點
t.nextWaiter = node;
// 最後一個節點的引用更新為新節點的引用
lastWaiter = node;
return node;
}
// 從條件等待佇列解除所有取消等待的節點的連線,其實就是所有取消節點移除的操作,涉及到雙向連結串列的斷鏈操作、第一個和最後一個節點的引用更新
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 注意這裡等待狀態的判斷
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// 當前同步器例項持有的執行緒是否當前執行緒(currentThread())
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
// 暫時不分析其他方法
}
```
實際上,`Condition`的所有`await()`方法變體都呼叫`addConditionWaiter()`新增阻塞執行緒到條件佇列中。我們按照分析同步等待佇列的情況,分析一下條件等待佇列。正常情況下,假設有2個執行緒`thread-1`和`thread-2`進入條件等待佇列,都處於阻塞狀態。
先是`thread-1`進入條件佇列:
![j-a-q-s-ex-4](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-4.png)
然後是`thread-2`進入條件佇列:
![j-a-q-s-ex-5](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-5.png)
條件等待佇列看起來也並不複雜,但是它並不是單獨存在和使用的,一般依賴於同步等待佇列,下面的一節分析`Condition`的實現的時候再詳細分析。
## 獨佔模式與共享模式
前文提及到,同步器涉及到獨佔模型和共享模式。下面就針對這兩種模式詳細分析一下`AQS`的具體實現原始碼。
### 獨佔模式
`AQS`同步器如果使用獨佔(`EXCLUSIVE`)模式,那麼意味著同一個時刻,只有唯一的一個節點所線上程獲取(`acuqire`)原子狀態`status`成功,此時該執行緒可以從阻塞狀態解除繼續執行,而同步等待佇列中的其他節點持有的執行緒依然處於阻塞狀態。獨佔模式同步器的功能主要由下面的四個方法提供:
- `acquire(int arg)`:申請獲取`arg`個原子狀態`status`(申請成功可以簡單理解為`status = status - arg`)。
- `acquireInterruptibly(int arg)`:申請獲取`arg`個原子狀態`status`,響應執行緒中斷。
- `tryAcquireNanos(int arg, long nanosTimeout)`:申請獲取`arg`個原子狀態`status`,帶超時的版本。
- `release(int arg)`:釋放`arg`個原子狀態`status`(釋放成功可以簡單理解為`status = status + arg`)。
獨佔模式下,`AQS`同步器例項初始化時候傳入的`status`值,可以簡單理解為"允許申請的資源數量的上限值",下面的`acquire`型別的方法暫時稱為"獲取資源",而`release`方法暫時稱為"釋放資源"。接著我們分析前面提到的四個方法的原始碼,先看`acquire(int arg)`:
```java
public final void acquire(int arg) {
// 獲取資源成功或者新增一個獨佔型別節點到同步等待佇列成功則直接返回,否則中斷當前執行緒
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 此方法必須又子類覆蓋,用於決定是否獲取資源成功
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 中斷當前執行緒
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
// 不可中斷的獨佔模式下,同步等待佇列中的執行緒獲取資源的方法
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
// 獲取新入隊節點的前驅節點
final Node p = node.predecessor();
// 前驅節點為頭節點並且嘗試獲取資源成功,也就是每一輪迴圈都會呼叫tryAcquire嘗試獲取資源,除非阻塞或者跳出迴圈
if (p == head && tryAcquire(arg)) {
// 設定新入隊節點為頭節點,原來的節點會從佇列中斷開
setHead(node);
p.next = null; // help GC
return interrupted; // <== 注意,這個位置是跳出死迴圈的唯一位置
}
// 判斷是否需要阻塞當前獲取資源失敗的節點中持有的執行緒
if (shouldParkAfterFailedAcquire(p, node))
// 阻塞當前執行緒,如果被喚醒則返回並清空執行緒的中斷標記
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
/**
* 檢查並且更新獲取資源失敗的節點的狀態,返回值決定執行緒是否需要被阻塞。
* 這個方法是所有迴圈獲取資源方法中訊號控制的主要方法
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 這裡記住ws是當前處理節點的前驅節點的等待狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驅節點狀態設定成Node.SIGNAL成功,等待被release呼叫釋放,後繼節點可以安全地進入阻塞狀態
return true;
if (ws > 0) {
// ws大於0只有一種情況Node.CANCELLED,說明前驅節點已經取消獲取資源,
// 這個時候會把所有這型別取消的前驅節點移除,找到一個非取消的節點重新通過next引用連線當前節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 其他等待狀態直接修改前驅節點等待狀態為Node.SIGNAL
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
// 阻塞當前執行緒,獲取並且重置執行緒的中斷標記位
private final boolean parkAndCheckInterrupt() {
// 這個就是阻塞執行緒的實現,依賴Unsafe的API
LockSupport.park(this);
return Thread.interrupted();
}
```
上面的程式碼雖然看起來能基本理解,但是最好用圖推敲一下"空間上的變化":
![j-a-q-s-ex-6](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-6.png)
![j-a-q-s-ex-7](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-7.png)
接著分析一下`release(int arg)`的實現:
```java
// 釋放資源
public final boolean release(int arg) {
// 嘗試釋放資源
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 嘗試釋放資源,獨佔模式下,嘗試通過重新設定status的值從而實現釋放資源的功能
// 這個方法必須由子類實現
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 解除傳入節點(一般是頭節點)的第一個後繼節點的阻塞狀態,當前處理節點的等待狀態會被CAS更新為0
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 當前處理的節點(一般是頭節點)狀態小於0則直接CAS更新為0
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 如果節點的第一個後繼節點為null或者等待狀態大於0(取消),則從等待佇列的尾節點向前遍歷,
// 找到最後一個不為null,並且等待狀態小於等於0的節點
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 解除上面的搜尋到的節點的阻塞狀態
if (s != null)
LockSupport.unpark(s.thread);
}
```
接著用上面的圖:
![j-a-q-s-8](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-8.png)
上面圖中`thread-2`晉升為頭節點的第一個後繼節點,等待下一個`release()`釋放資源喚醒之就能晉升為頭節點,一旦晉升為頭節點也就是意味著可以解除阻塞繼續執行。接著我們可以看`acquire()`的響應中斷版本和帶超時的版本。先看`acquireInterruptibly(int arg)`:
```java
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 獲取並且清空執行緒中斷標記位,如果是中斷狀態則直接拋InterruptedException異常
if (Thread.interrupted())
throw new InterruptedException();
// 如果獲取資源失敗
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 獨佔模式下響應中斷的獲取資源方法
private void doAcquireInterruptibly(int arg) throws InterruptedException {
// 基於當前執行緒新增一個獨佔的Node節點進入同步等待佇列中
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
// 獲取資源失敗進入阻塞狀態
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 解除阻塞後直接丟擲InterruptedException異常
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
```
`doAcquireInterruptibly(int arg)`方法和`acquire(int arg)`類似,最大的不同點在於阻塞執行緒解除阻塞後並不是正常繼續執行,而是直接丟擲`InterruptedException`異常。最後看`tryAcquireNanos(int arg, long nanosTimeout)`的實現:
```java
// 獨佔模式下嘗試在指定超時時間內獲取資源,響應執行緒中斷
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
// 獨佔模式下帶超時時間限制的獲取資源方法
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 超時期限小於0納秒,快速失敗
if (nanosTimeout <= 0L)
return false;
// 超時的最終期限是當前系統時鐘納秒+外部指定的nanosTimeout增量
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
// 計算出剩餘的超時時間
nanosTimeout = deadline - System.nanoTime();
// 剩餘超時時間小於0說明已經超時則取消獲取
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
// 這裡會判斷剩餘超時時間大於1000納秒的時候才會進行帶超時期限的執行緒阻塞,否則會進入下一輪獲取嘗試
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
```
`tryAcquireNanos(int arg, long nanosTimeout)`其實和`doAcquireInterruptibly(int arg)`類似,它們都響應執行緒中斷,不過`tryAcquireNanos()`在獲取資源的每一輪迴圈嘗試都會計算剩餘可用的超時時間,只有同時滿足獲取失敗需要阻塞並且剩餘超時時間大於`SPIN_FOR_TIMEOUT_THRESHOLD(1000納秒)`的情況下才會進行阻塞。
獨佔模式的同步器的一個顯著特點就是:頭節點的第一個有效(非取消)的後繼節點,總是嘗試獲取資源,一旦獲取資源成功就會解除阻塞並且晉升為頭節點,原來所在節點會移除出同步等待佇列,原來的佇列長度就會減少1,然後頭結點的第一個有效的後繼節點繼續開始競爭資源。
![j-a-q-s-ex-9](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-9.png)
使用獨佔模式同步器的主要類庫有:
- 可重入鎖`ReentrantLock`。
- 讀寫鎖`ReentrantReadWriteLock`中的寫鎖`WriteLock`。
### 共享模式
共享(`SHARED`)模式中的"共享"的含義是:同一個時刻,如果有一個節點所線上程獲取(`acuqire`)原子狀態`status`成功,那麼它會解除阻塞被喚醒,並且會把**喚醒狀態傳播**到所有有效的後繼節點(換言之就是喚醒整個同步等待佇列中的所有有效的節點)。共享模式同步器的功能主要由下面的四個方法提供:
- `acquireShared(int arg)`:申請獲取`arg`個原子狀態`status`(申請成功可以簡單理解為`status = status - arg`)。
- `acquireSharedInterruptibly(int arg)`:申請獲取`arg`個原子狀態`status`,響應執行緒中斷。
- `tryAcquireSharedNanos(int arg, long nanosTimeout)`:申請獲取`arg`個原子狀態`status`,帶超時的版本。
- `releaseShared(int arg)`:釋放`arg`個原子狀態`status`(釋放成功可以簡單理解為`status = status + arg`)。
先看`acquireShared(int arg)`的原始碼:
```java
// 共享模式下獲取資源
public final void acquireShared(int arg) {
// 注意tryAcquireShared方法值為整型,只有小於0的時候才會加入同步等待佇列
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 共享模式下嘗試獲取資源,此方法需要由子類覆蓋
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下獲取資源和處理同步等待佇列的方法
private void doAcquireShared(int arg) {
// 基於當前執行緒新建一個標記為共享的新節點
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
// 如果當前節點的前驅節點是頭節點
if (p == head) {
// 每一輪迴圈都會呼叫tryAcquireShared嘗試獲取資源,除非阻塞或者跳出迴圈
int r = tryAcquireShared(arg);
if (r > = 0) { // <= tryAcquireShared方法>=0說明直資源獲取成功
// 設定頭結點,並且傳播獲取資源成功的狀態,這個方法的作用是確保喚醒狀態傳播到所有的後繼節點
// 然後任意一個節點晉升為頭節點都會喚醒其第一個有效的後繼節點,起到一個鏈式釋放和解除阻塞的動作
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 判斷獲取資源失敗是否需要阻塞,這裡會把前驅節點的等待狀態CAS更新為Node.SIGNAL
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
// 設定同步等待佇列的頭節點,判斷當前處理的節點的後繼節點是否共享模式的節點,如果共享模式的節點,
// propagate大於0或者節點的waitStatus為PROPAGATE則進行共享模式下的釋放資源
private void setHeadAndPropagate(Node node, int propagate) {
// h為頭節點的中間變數
Node h = head;
// 設定當前處理節點為頭節點
setHead(node);
// 這個判斷條件比較複雜:入參propagate大於0 || 頭節點為null || 頭節點的狀態為非取消 || 再次獲取頭節點為null || 再次獲取頭節點不為取消
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 當前節點(其實已經成為頭節點)的第一個後繼節點為null或者是共享模式的節點
if (s == null || s.isShared())
doReleaseShared();
}
}
// Release action for shared mode:共享模式下的釋放資源動作
private void doReleaseShared() {
for (;;) {
Node h = head;
// 頭節點不為null並且不為尾節點
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果頭節點等待狀態為SIGNAL(-1)則CAS更新它為0,更新成功後喚醒和解除其後繼節點的阻塞
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue;
// 喚醒頭節點的後繼節點
unparkSuccessor(h);
}
// 如果頭節點的等待狀態為0,則CAS更新它為PROPAGATE(-3)
else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue;
}
// 頭節點沒有變更,則跳出迴圈
if (h == head)
break;
}
}
```
其實程式碼的實現和獨佔模式有很多類似的地方,一個很大的不同點是:共享模式同步器當節點獲取資源成功晉升為頭節點之後,它會把自身的等待狀態通過`CAS`更新為`Node.PROPAGATE`,下一個加入等待佇列的新節點會把頭節點的等待狀態值更新回`Node.SIGNAL`,標記後繼節點處於可以被喚醒的狀態,如果遇上資源釋放,那麼這個阻塞的節點就能被喚醒從而解除阻塞。我們還是畫圖理解一下,先假設`tryAcquireShared(int arg)`總是返回小於0的值,入隊兩個阻塞的執行緒`thread-1`和`thread-2`,然後進行資源釋放確保`tryAcquireShared(int arg)`總是返回大於0的值:
![j-a-q-s-ex-10](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-10.png)
看起來和獨佔模式下的同步等待佇列差不多,實際上真正不同的地方在於有節點解除阻塞和晉升為頭節點的過程。因此我們可以先看`releaseShared(int arg)`的原始碼:
```java
// 共享模式下釋放資源
public final boolean releaseShared(int arg) {
// 嘗試釋放資源成功則呼叫前面分析過的doReleaseShared以傳播喚醒狀態和unpark頭節點的後繼節點
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 共享模式下嘗試釋放資源,必須由子類覆蓋
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
```
`releaseShared(int arg)`就是在`tryReleaseShared(int arg)`呼叫返回`true`的情況下主動呼叫一次`doReleaseShared()`從而基於頭節點傳播喚醒狀態和`unpark`頭節點的後繼節點。接著之前的圖:
![j-a-q-s-ex-11](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-11.png)
![j-a-q-s-ex-12](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-12.png)
接著看`acquireSharedInterruptibly(int arg)`的原始碼實現:
```java
// 共享模式下獲取資源的方法,響應執行緒中斷
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 和非響應執行緒中斷的acquireShared方法類似,不過這裡解除阻塞之後直接丟擲異常InterruptedException
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
```
最後看`tryAcquireSharedNanos(int arg, long nanosTimeout)`的原始碼實現:
```java
// 共享模式下獲取資源的方法,帶超時時間版本
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 注意這裡只要tryAcquireShared >= 0或者doAcquireSharedNanos返回true都認為獲取資源成功
return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 計算超時的最終期限
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return true;
}
}
//重新計算剩餘的超時時間
nanosTimeout = deadline - System.nanoTime();
// 超時的情況下直接取消獲取
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
// 滿足阻塞狀態並且剩餘的超時時間大於閥值1000納秒則通過LockSupport.parkNanos()阻塞執行緒
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// 解除阻塞後判斷執行緒的中斷標記並且清空標記位,如果是處於中斷狀態則丟擲InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
```
共享模式的同步器的一個顯著特點就是:頭節點的第一個有效(非取消)的後繼節點,總是嘗試獲取資源,一旦獲取資源成功就會解除阻塞並且晉升為頭節點,原來所在節點會移除出同步等待佇列,原來的佇列長度就會減少1,重新設定頭節點的過程會傳播喚醒的狀態,簡單來說就是喚醒一個有效的後繼節點,只要一個節點可以晉升為頭節點,它的後繼節點就能被喚醒,以此類推。**節點的喚醒順序遵循類似於FIFO的原則,通俗說就是先阻塞或者阻塞時間最長則先被喚醒**。
![j-a-q-s-ex-13](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-13.png)
使用共享模式同步器的主要類庫有:
- 訊號量`Semaphore`。
- 倒數柵欄`CountDownLatch`。
## Condition的實現
`Condition`例項的建立是在`Lock`介面的`newCondition()`方法,它是鎖條件等待的實現,基於作用或者語義可以見`Condition`介面的相關API註釋:
> Condition是物件監視器鎖方法Object#wait()、Object#notify()和Object#notifyAll()的替代實現,物件監視器鎖實現鎖的時候作用的效果是每個鎖物件必須使用多個wait-set(JVM內建的等待佇列),通過Object提供的方法和監視器鎖結合使用就能達到Lock的實現效果。如果替換synchronized方法和語句並且結合使用Lock和Condition,就能替換並且達到物件監視器鎖的效果。
`Condition`必須固有地繫結在一個`Lock`的實現類上,也就是要通過`Lock`的例項建立`Condition`例項,而且`Condition`的方法呼叫使用必須在`Lock`的"鎖定程式碼塊"中,這一點和`synchronized`關鍵字以及`Object`的相關JNI方法使用的情況十分相似。
前文介紹過`Condition`介面提供的方法以及`Condition`佇列,也就是條件等待佇列,通過畫圖簡單介紹了它的佇列節點組成。實際上,條件等待佇列需要結合同步等待佇列使用,這也剛好對應於**前面提到的`Condition`的方法呼叫使用必須在`Lock`的鎖定程式碼塊中**。聽起來很懵逼,我們慢慢分析一下`ConditionObject`的方法原始碼就能知道具體的原因。
先看`ConditionObject#await()`方法:
```java
// 退出等待後主動進行中斷當前執行緒
private static final int REINTERRUPT = 1;
// 退出等待後丟擲InterruptedException異常
private static final int THROW_IE = -1;
/**
* 可中斷的條件等待實現
* 1、當前執行緒處於中斷狀態則丟擲InterruptedException
* 2、儲存getState返回的鎖狀態,並且使用此鎖狀態呼叫release釋放所有的阻塞執行緒
* 3、執行緒加入等待佇列進行阻塞,直到signall或者中斷
* 4、通過儲存getState返回的鎖狀態呼叫acquire方法
* 5、第4步中阻塞過程中中斷則丟擲InterruptedException
*/
public final void await() throws InterruptedException {
// 如果執行緒是中斷狀態則清空中斷標記位並且丟擲InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 當前執行緒所在的新節點加入條件等待佇列
Node node = addConditionWaiter();
// 釋放當前AQS中的所有資源返回資源的status儲存值,也就是基於status的值呼叫release(status) - 其實這一步是解鎖操作
int savedState = fullyRelease(node);
// 初始化中斷模式
int interruptMode = 0;
// 如果節點新建的節點不位於同步佇列中(理論上應該是一定不存在),則對節點所線上程進行阻塞,第二輪迴圈理論上節點一定在同步等待佇列中
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 處理節點所線上程中斷的轉換操作
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 節點所線上程被喚醒後,如果節點所線上程沒有處於中斷狀態,則以獨佔模式進行頭節點競爭
// 注意這裡使用的status是前面釋放資源時候返回的儲存下來的status
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 下一個等待節點不空,則從等待佇列中移除所有取消的等待節點
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// interruptMode不為0則按照中斷模式進行不同的處理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 釋放當前AQS中的所有資源,其實也就是基於status的值呼叫release(status)
// 這一步對於鎖實現來說,就是一個解鎖操作
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
// 釋放失敗則標記等待狀態為取消
node.waitStatus = Node.CANCELLED;
throw t;
}
}
// 傳入的節點是否在同步佇列中
final boolean isOnSyncQueue(Node node) {
// 節點等待您狀態為CONDITION或者前驅節點為null則返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 因為等待佇列是通過nextWaiter連線,next引用存在說明節點位於同步佇列
if (node.next != null)
return true;
// 從同步佇列的尾部向前遍歷是否存在傳入的節點例項
return findNodeFromTail(node);
}
// 從同步佇列的尾部向前遍歷是否存在傳入的節點例項
private boolean findNodeFromTail(Node node) {
for (Node p = tail;;) {
if (p == node)
return true;
if (p == null)
return false;
p = p.prev;
}
}
// 這是一個很複雜的判斷,用了兩個三目表示式,作用是如果新建的等待節點所線上程中斷,
// 則把節點的狀態由CONDITION更新為0,並且加入到同步等待佇列中,返回THROW_IE中斷狀態,如果加入同步佇列失敗,返回REINTERRUPT
// 如果新建的等待節點所線上程沒有中斷,返回0,也就是初始狀態的interruptMode
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
// 節點執行緒中斷取消等待後的轉換操作
final boolean transferAfterCancelledWait(Node node) {
// CAS更新節點的狀態由CONDITION更改為0
if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
// 節點加入同步等待佇列
enq(node);
return true;
}
// 這裡嘗試自旋,直到節點加入同步等待佇列成功
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
// 等待完畢後報告中斷處理,前邊的邏輯得到的interruptMode如果為THROW_IE則丟擲InterruptedException,如果為REINTERRUPT則中斷當前執行緒
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
```
其實上面的`await()`邏輯並不複雜,前提是理解了物件監視器鎖那套等待和喚醒的機制(由`JVM`實現,`C`語言學得好的可以去看下原始碼),這裡只是通過演算法和資料結構重新進行了一次實現。`await()`主要使用了兩個佇列:同步等待佇列和條件等待佇列。我們先假設有兩個執行緒`thread-1`和`thread-2`呼叫了下面的程式碼中的`process()`方法:
```java
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void process(){
try{
lock.lock();
condition.await();
// 省略其他邏輯...
}finally{
lock.unlock();
}
}
```
`ReentrantLock`使用的是`AQS`獨佔模式的實現,因此在呼叫`lock()`方法的時候,同步等待佇列的一個瞬時快照(假設執行緒`thread-1`先加入同步等待佇列)可能如下:
![j-a-q-s-ex-14](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-14.png)
接著,執行緒`thread-1`所在節點是頭節點的後繼節點,獲取鎖成功,它解除阻塞後可以呼叫`await()`方法,這個時候會釋放同步等待佇列中的所有等待節點,也就是執行緒`thread-2`所在的節點也被釋放,因此執行緒`thread-2`也會呼叫`await()`方法:
![j-a-q-s-ex-15](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-15.png)
只要有執行緒能夠到達`await()`方法,**那麼原來的同步器中的同步等待佇列就會釋放所有阻塞節點,表現為釋放鎖,然後這些釋放掉的節點會加入到條件等待佇列中,條件等待佇列中的節點也是阻塞的**,這個時候只有通過`signal()`或者`signalAll()`進行**佇列元素轉移**才有機會喚醒阻塞的執行緒。因此接著看`signal()`和`signalAll()`的原始碼實現:
```java
// 從等待佇列中移動一個等待時間最長的執行緒(如果過存在的話)到鎖同步等待佇列中
public final void signal() {
// 判斷當前執行緒是否和獨佔執行緒一致,其實就是此操作需要在鎖程式碼塊中執行
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 基於第一個等待節點進行Signal操作
private void doSignal(Node first) {
do {
// 首節點的下一個等待節點為空,說明只剩下一個等待節點
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 當前處理節點從連結串列從移除
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
// 喚醒的轉換操作
final boolean transferForSignal(Node node) {
// CAS更新節點狀態由CONDITION到0,更新失敗則返回false不喚醒
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
// 節點作為新節點重新加入到同步等待佇列
Node p = enq(node);
int ws = p.waitStatus;
// 取消或者更新節點等待狀態為SIGNAL的節點需要解除阻塞進行重新同步,這裡的操作只針對取消和狀態異常的節點
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
// 從等待佇列中移動所有等待時間最長的執行緒(如果過存在的話)到鎖同步等待佇列中
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 基於第一個等待節點進行SignalAll操作
private void doSignalAll(Node first) {
// 置空lastWaiter和firstWaiter
lastWaiter = firstWaiter = null;
do {
// 獲取下一個等待節點
Node next = first.nextWaiter;
// 當前處理節點從連結串列從移除
first.nextWaiter = null;
// 處理當前節點
transferForSignal(first);
// 更新中間引用
first = next;
} while (first != null);
}
```
其實`signal()`或者`signalAll()`會對取消的節點或者短暫中間狀態的節點進行解除阻塞,但是正常情況下,它們的操作結果是把阻塞等待時間最長的一個或者所有節點重新加入到`AQS`的同步等待佇列中。例如,上面的例子呼叫`signal()`方法後如下:
![j-a-q-s-ex-16](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202007/j-a-q-s-ex-16.png)
這樣子,相當於執行緒`thread-1`重新加入到`AQS`同步等待佇列中(從條件等待佇列中移動到同步等待佇列中),並且開始競爭頭節點,一旦競爭成功,就能夠解除阻塞。這個時候從邏輯上看,`signal()`方法最終解除了對執行緒`thread-1`的阻塞。`await()`的其他變體方法的原理是類似的,這裡因為篇幅原因不再展開。這裡小結一下`Condition`的顯著特點:
- 1、同時依賴兩個同步等待佇列,一個是`AQS`提供,另一個是`ConditionObject`提供的。
- 2、`await()`方法會釋放`AQS`同步等待佇列中的阻塞節點,這些節點會加入到條件等待佇列中進行阻塞。
- 3、`signal()`或者`signalAll()`會把條件等待佇列中的節點重新加入`AQS`同步等待佇列中,並不解除正常節點的阻塞狀態。
- 4、接第3步,這些進入到`AQS`同步等待佇列的節點會重新競爭成為頭節點,接下來的步驟其實也就是前面分析過的獨佔模式下的`AQS`的運作原理。
## 取消獲取資源(cancelAcquire)
新節點加入等待佇列失敗導致任何型別的異常或者帶超時版本的API呼叫的時候剩餘超時時間小於等於零的時候,就會呼叫`cancelAcquire()`方法,用於取消該節點對應節點獲取資源的操作。
```java
// 取消節點獲取資源的操作
private void cancelAcquire(Node node) {
// 節點為null直接返回
if (node == null)
return;
// 置空節點持有的執行緒,因為此時節點執行緒已經發生中斷
node.thread = null;
Node pred = node.prev;
// 這個迴圈是為了獲取當前節點的上一個不為取消狀態的節點,也就是中間如果發生了取消的節點都直接斷開
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 儲存當前節點的上一個不為取消狀態的節點的後繼節點
Node predNext = pred.next;
// 當前節點等待狀態更新為CANCELLED
node.waitStatus = Node.CANCELLED;