【9036期】JUC多執行緒---AQS抽象佇列同步器原理
AQS 的工作原理
什麼是 AQS
AQS,Abstract Queued Synchronizer,抽象佇列同步器,是 J.U.C 中實現鎖及同步元件的基礎。工作原理就是如果被請求的共享資源空閒,則將當前請求資源的執行緒設定為有效的工作執行緒,並且將共享資源設定為鎖定狀態,如果被請求的共享資源被佔用,那麼就將獲取不到鎖的執行緒加入到等待佇列中。這時,就需要一套執行緒阻塞等待以及被喚醒時的鎖分配機制,而 AQS 是通過 CLH 佇列實現鎖分配的機制。
CLH 同步佇列的模型
CLH 佇列是由內部類 Node 構成的同步佇列,是一個雙向佇列(不存在佇列例項,僅存在節點之間的關聯關係),將請求共享資源的執行緒封裝成 Node 節點來實現鎖的分配;同時利用內部類 ConditionObject 構建等待佇列,當呼叫 ConditionObject 的 await() 方法後,執行緒將會加入等待佇列中,當呼叫 ConditionObject 的 signal() 方法後,執行緒將從等待佇列轉移動到同步佇列中進行鎖競爭。AQS 中只能存在一個同步佇列,但可擁有多個等待佇列。AQS 的 CLH 同步佇列的模型如下圖:
AQS 有三個主要變數,分別是 head、tail、state,其中 head 指向同步佇列的頭部,注意 head 為空結點,不儲存資訊。而 tail 則是同步佇列的隊尾,同步佇列採用的是雙向連結串列的結構是為了方便對佇列進行查詢操作。當 Node 節點被設定為 head 後,其 thread 資訊和前驅結點將被清空,因為該執行緒已獲取到同步狀態,正在執行了,也就沒有必要儲存相關資訊了,head 只儲存後繼結點的指標即可,便於 head 結點釋放同步狀態後喚醒後繼結點。
佇列的入隊和出隊操作都是無鎖操作,基於 CAS+自旋鎖 實現,AQS 維護了一個 volatile 修飾的 int 型別的 state 同步狀態,volatile 保證執行緒之間的可見性,並通過 CAS 對該同步狀態進行原子操作、實現對其值的修改。當 state=0 時,表示沒有任何執行緒佔有共享資源的鎖,當 state=1 時,則說明當前有執行緒正在使用共享變數,其他執行緒必須加入同步佇列進行等待;
內部類 Node 資料結構分析
static final class Node { //共享模式 static final Node SHARED = new Node(); //獨佔模式 static final Node EXCLUSIVE = null; //標識執行緒已處於結束狀態 static final int CANCELLED = 1; //等待被喚醒狀態 static final int SIGNAL = -1; //條件狀態 static final int CONDITION = -2; //在共享模式中使用表示獲得的同步狀態會被傳播 static final int PROPAGATE = -3; //等待狀態,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4種取值 volatile int waitStatus; //同步佇列中前驅結點 volatile Node prev; //同步佇列中後繼結點 volatile Node next; //請求鎖的執行緒 volatile Thread thread; //等待佇列中的後繼結點,這個與Condition有關,稍後會分析 Node nextWaiter; //判斷是否為共享模式 final boolean isShared() { return nextWaiter == SHARED; } //..... }
AQS分為兩種模式:獨佔模式 EXCLUSIVE 和 共享模式 SHARED,像 ReentrantLock、CyclicBarrier 是基於獨佔模式模式實現的,Semaphore,CountDownLatch 等是基於共享模式。
變數 waitStatus 表示當前封裝成 Node 節點的執行緒的等待狀態,共有4種取值 CANCELLED、SIGNAL、CONDITION、PROPAGATE:
- CANCELLED:值為1,表示在同步佇列中的執行緒等待超時或者被中斷,處於已結束狀態,需要從同步佇列中移除該 Node 節點
- SIGNAL:值為-1,表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新為 SIGNAL,當該節點釋放了同步鎖之後,就會喚醒該節點的後繼節點
- CONDITION:值為-2,與 Condition 相關,表示該結點在 condition 等待佇列中阻塞,當其他執行緒呼叫了Condition 的 signal() 方法後,CONDITION 狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖。
- PROPAGATE:值為-3時,在共享模式下使用,表示該執行緒以及後繼執行緒進行無條件傳播。前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。
AQS 的設計模式
AQS 的模板方法模式
AQS 的基於模板方法模式設計的,在 AQS 抽象類中已經實現了執行緒在等待佇列的維護方式(如獲取資源失敗入隊/喚醒出隊等),而對於具體共享資源 state 的獲取與釋放(也就是鎖的獲取和釋放)則交由具體的同步器來實現,具體的同步器需要實現以下幾種方法:
- isHeldExclusively():該執行緒是否正在獨佔資源,只有用到 condition 才需要去實現它
- tryAcquire(int):獨佔模式,嘗試獲取資源,成功則返回 true,失敗則返回 false
- tryRelease(int):獨佔方式,嘗試釋放資源,成功則返回 true,失敗則返回 false
- tryAcquireShared(int):共享方式,嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源
- tryReleaseShared(int):共享方式,嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false
JUC 中提供的同步器
- 閉鎖 CountDownLatch:用於讓主執行緒等待一組事件全部發生後繼續執行。
- 柵欄 CyclicBarrier:用於等待其它執行緒,且會阻塞自己當前執行緒,所有執行緒必須全部到達柵欄位置後,才能繼續執行;且在所有執行緒到達柵欄處之後,可以觸發執行另外一個預先設定的執行緒。
- 訊號量 Semaphore:用於控制訪問資源的執行緒個數,常常用於實現資源池,如資料庫連線池,執行緒池。在 Semaphore 中,acquire 方法用於獲取資源,有的話,繼續執行
- 沒有資源的話將阻塞直到有其它執行緒呼叫 release 方法釋放資源;
- 交換器 Exchanger:用於執行緒之間進行資料交換;當兩個執行緒都到達共同的同步點(都執行到exchanger.exchange 的時刻)時,發生資料交換,否則會等待直到其它執行緒到達;
CountDownLatch 和 CyclicBarrier 的區別?
兩者都可以用來表示程式碼執行到某個點上,二者的區別在於:
① CyclicBarrier 的某個執行緒執行到某個位置之後就停止執行,直到所有的執行緒都到達了這個點,所有執行緒才重新執行;CountDownLatch 的某執行緒執行到某個位置之後,只是給計數值-1而已,該執行緒繼續執行;
② CyclicBarrier 可重用,CountDownLatch 不可重用,計數值 為 0 時該 CountDownLatch 就不可再用了。
ReentranLock 中獨佔模式下非公平鎖的獲取流程
獲取獨佔鎖的過程是定義在 tryAcquire() 中的,當前執行緒嘗試獲取同步狀態,如果獲取失敗,就將執行緒封裝成 Node 節點插入到 CLH 同步佇列中。插入同步佇列後,執行緒並沒有放棄獲取同步狀態,而是根據前置節點狀態狀態判斷是否繼續獲取,如果前置節點是 head 結點,繼續嘗試獲取,否則就將執行緒掛起。如果成功獲取同步狀態則將自己設定為 head 結點。當持有同步狀態的執行緒釋放資源後,也會喚醒佇列中的後繼執行緒。
ConditionObject 阻塞佇列
什麼是 Condition 介面
AQS 的阻塞佇列是基於內部類 ConditionObject 實現的,而 ConditionObject 實現了 Condition 介面。那 Condition 介面是什麼呢?Condition 主要用於執行緒的等待和喚醒,在JDK5之前,執行緒的等待喚醒是用 Object 類的 wait/notify/notifyAll 方法實現的,這些方法必須配合 synchronized 關鍵字使用,使用起來不是很方便,為了解決這個問題,在 JDK5 之後,J.U.C 提供了Condition。
- Condition.await 對應於 Object.wait;
- Condition.signal 對應於 Object.notify;
- Condition.signalAll 對應於 Object.notifyAll;
與 synchronized 的等待喚醒機制相比,Condition 能夠精細的控制多執行緒的休眠與喚醒,具備更多的靈活性, 通過多個 Condition 例項物件建立不同的等待佇列,從而實現同一個鎖擁有多個等待佇列。而 synchronized 關鍵字只能有一組等待喚醒佇列,使用 notify() 喚醒執行緒時只能隨機喚醒佇列中的一個執行緒。
ConditionObject 阻塞佇列實現原理
Condition 的具體實現之一是 AQS 的內部類 ConditionObject,每個 Condition 都對應著一個等待佇列,也就是說如果一個鎖上建立了多個 Condition 物件,那麼也就存在多個等待佇列。當呼叫 ConditionObject 的 await() 方法後,執行緒將會加入等待佇列中,當呼叫 ConditionObject 的 signal() 方法後,執行緒將從等待佇列轉移動同步佇列中進行鎖競爭。AQS 的 ConditionObject 中的等待佇列模型如下:
AQS 的執行緒喚醒機制原理
AQS 的執行緒喚醒是通過 singal() 方法實現的,我們先看下 singal() 方法執行緒喚醒的流程圖:
signal() 方法主要呼叫了 doSignal(),而 doSignal() 方法中做了兩件事:
(1)從條件等待佇列移除被喚醒的節點,然後重新維護條件等待佇列的 firstWaiter 和 lastWaiter 的指向。
(2)將從等待佇列移除的結點加入同步佇列(在 transferForSignal() 方法中完成的),如果進入到同步佇列失敗並且條件等待佇列還有不為空的節點,則繼續迴圈喚醒後續其他結點的執行緒。注意:無論是同步佇列還是等待佇列,使用的 Node 資料結構都是同一個,不過是使用的內部變數不同罷了
所以 signal() 的流程可以概述為:
- signal() 被呼叫後,先判斷當前執行緒是否持有獨佔鎖
- 如果有,那麼喚醒當前 Condition 等待佇列的第一個結點的執行緒,並從等待佇列中移除該結點,新增到同步佇列中
- 如果加入同步佇列失敗,那麼繼續迴圈喚醒等待佇列中的其他結點的執行緒
- 如果成功加入同步佇列,那麼如果其前驅結點已結束或者設定前驅節點狀態為 Node.SIGNAL 狀態失敗,則通過 LockSupport.unpark() 喚醒被通知節點代表的執行緒。
到此 signal() 任務完成,被喚醒後的執行緒,將呼叫 AQS 的 acquireQueued() 方法加入獲取同步狀態的競爭中,這就是等待喚醒機制的整個流程實現原理。
吃水不忘挖井人: |