AQS抽象同步器的核心原理與實踐
基於CAS自旋實現的輕量級鎖有兩個問題:
(1)CAS空自旋會浪費大量CPU資源。
(2)在CMP架構的CPU會導致“匯流排風暴”。
解決CAS空自旋的有效方式之一是以空間換時間,比較常見的方案由兩種:分散操作和熱點和使用佇列削峰。JUC使用的是佇列削峰的方案解決CAS效能問題(LongAdder是分散熱點),它提供了一個雙向佇列的削峰基類——抽象基礎類AbstractQueuedSynchronizer(AQS)。
鎖與佇列的關係
1.CLH鎖的內部佇列
CLH自旋鎖使用的CLH是一個單向佇列,也是一個FIFO佇列。在獨佔鎖中,競爭資源在一個時間點只能被一個執行緒鎖訪問,佇列頭部的節點表示佔有鎖的節點,新加入的搶鎖執行緒需要等待,會插入佇列的尾部。
2.分散式鎖的內部佇列
在分散式鎖的實現中,以ZooKeeper的分散式鎖為例,就是建立臨時節點,順序執行。
3.AQS的內部佇列
AQS是JUC提供的一個用於構建鎖和同步容器的基礎類。例如ReentrantLock、Semaphore、CountDownLatch、FutureTask等都是基於AQS構建的。AQS解決了實現同步容器時設計的大量細節問題。
AQS是CLH佇列的一個變種。AQS佇列內部維護的是一個FIFO的雙向連結串列,每個節點有前驅結點和後繼節點。每個節點由執行緒封裝,當執行緒爭搶鎖失敗後會封裝成節點加入AQS佇列中;當獲取鎖的執行緒釋放鎖以後,會從佇列中喚醒一個阻塞的節點(執行緒)。
AQS的核心成員
AQS出於“分離變與不變”(人話:單一職責和開閉原則)的原則,基於模版模式實現。AQS為鎖獲取、鎖釋放的排隊和出隊過程提供了一系列的模版方法。由於JUC的顯式鎖種類豐富,因此AQS將不同鎖的具體操作抽取為鉤子方法,讓各種鎖的子類去實現。
狀態標誌位
AQS中維持了一個單一的volatile變數state,state表示鎖的狀態。
private volatile int state;
state保證了可見性,所以任何執行緒通過getState()獲取狀態都可以得到最新值。AQS提供了compareAndSetState()方法利用底層UnSafe的CAS機制來實現原子性。
protected final boolean compareAndSetState(int exepect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
以ReentrantLock為例,state初始化為0,表示未鎖定。A執行緒執行該鎖的lock()操作時,會呼叫tryAcquire獨佔該鎖並將state加1。此後,其他執行緒再tryAcquire()時就會失敗,直到A執行緒unlock()到state=0為止,其他執行緒才有機會獲取該鎖。當然,釋放鎖之前,A執行緒自己是可以重複獲取鎖的(state會累加),這就是可重入。但是,獲取多少次就要釋放多少次,這樣才能保證state回到零態。
AbstractQueuedSynchronizer繼承了AbstractOwnableSynchronizer,這個基類只有一個變數exclusiveOwnerThread,表示當前佔用該鎖的執行緒,並且提供了get和set方法。
佇列節點類
Node
FIFO雙向同步佇列
每當執行緒通過AQS獲取鎖失敗時,執行緒將被封裝成一個Node節點,通過CAS原子操作插入佇列尾部。當有執行緒釋放鎖時,AQS會嘗試讓隊頭的後繼節點佔用鎖。
JUC顯式鎖與AQS的關係
AQS是一個同步器,它實現了鎖的基本抽象功能,該類是由模版模式來實現的。
1.ReentrantLock與AQS的組合關係
ReentrantLock是一個可重入的互斥鎖,可以被單個執行緒多次獲取。ReentrantLock把所有Lock介面的操作都委派到一個Sync類上,該類繼承了AbstractQueuedSynchronizer:
static abstract class Sync extends AbstractQueuedSynchronizer { ... }
ReentrantLock支援公平鎖和非公平鎖。預設情況下是非公平鎖。
final static class NonfairSync extends Sync { ... }
final static class FairSync extends Sync { ... }
由ReentrantLock的lock和unlock的原始碼可以看到,它們只是分別呼叫了sync物件的lock和release方法。
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
而Sync內部類只是AQS的子類,所以本質是ReentrantLock的操作是委託給AQS完成的。
AQS的模版流程
AQS定義了兩種資源共享方式:
- Exclusive(獨享鎖):只有一個執行緒能佔有鎖資源,如ReentrantLock。
- share(共享鎖):多個執行緒可以同時佔有資源,如Semaphore、CountDownLatch。
AQS為不同的資源共享方式提供了不同的模版流程,AQS提供了一種實現阻塞鎖和依賴FIFO等待佇列的同步器的框架。自定義的同步器只需要實現共享資源state的獲取與釋放方式即可,這些邏輯都編寫在鉤子方法中。無論是共享鎖還是獨佔鎖,AQS在執行模版流程時都會回撥自定義的鉤子方法。
AQS的鉤子方法
自定義同步器時,AQS中需要重寫的鉤子方法如下:
- tryAcquire(int):獨佔鎖鉤子,嘗試獲取資源,若成功則返回true,若失敗則返回false。
- tryRekease(int):獨佔鎖鉤子,嘗試釋放資源,若成功則返回true,若失敗則返回false。
- tryAcquireShared(int):共享鎖鉤子,嘗試獲取資源,負數表示失敗;
- isHeldExclusively():獨佔鎖鉤子,判斷該執行緒是否正在獨佔資源。只有用到condition條件佇列時才需要去實現它。
通過AQS實現簡單的獨佔鎖
SimpleMockLock只實現了Lock介面的兩個方法:
(1)lock方法:完成顯式鎖的搶佔。
(2)unlock方法:完成顯式鎖的釋放。
SimpleMockLock的鎖搶佔和釋放是委託給Sync例項的方法來實現的。在搶佔鎖時,AQS的acquire會呼叫tryAcquire鉤子方法;釋放鎖時,AQS的release會呼叫tryRelease鉤子方法。
內部類Sync繼承AQS類時提供了一下兩個鉤子方法的實現:
(1)tryAcquire:將state設定為1並儲存當前執行緒,表示互斥鎖已經佔用。
(2)tryRelease:將state設定為0,表示互斥鎖已經被釋放。
public class SimpleMockLock implements Lock {
// 同步器例項
private final Sync sync = new Sync();
// 自定義的內部類:同步器
// 直接使用 state 表示鎖的狀態
// state = 0 表示鎖沒有被佔用
// state = 1 表示已經被佔用
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
// 接下來不需要使用CAS操作,因為下面的操作不存在併發場景
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return null;
}
static int i = 0;
public static void lockAndFastIncrease(Lock lock) {
lock.lock();
i++;
System.out.println(i);
lock.unlock();
}
public static void main(String[] args) {
LongAdder cnt = new LongAdder();
final int TURNS = 1000;
final int THREADS = 10;
final ExecutorService pool = Executors.newFixedThreadPool(THREADS);
final SimpleMockLock lock = new SimpleMockLock();
long start = System.currentTimeMillis();
for (int i = 0; i < THREADS; i++) {
pool.submit(() -> {
try {
for (int j = 0; j < TURNS; j++) {
lockAndFastIncrease(lock);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
final long l = System.currentTimeMillis() - start;
System.out.println("耗時:" + l);
pool.shutdown();
}
}
AQS鎖搶佔原理
流程的第一步,顯式鎖的lock方法會呼叫同步器基類AQS的模版方法acquire。acquire是AQS封裝好的獲取資源的公共入口,它是AQS提供的利用獨佔的方式獲取資源的方法,原始碼如下
public final void acquire(int arg) {
if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire至少執行一次tryAcquire鉤子方法,tryAcquire預設丟擲一個異常,具體的獲取獨佔資源state的邏輯需要鉤子方法來實現。若呼叫tryAcquire嘗試成功,則acquire將直接返回,表示搶到鎖;若不成功,則將執行緒加入等待佇列中。
tryAcquire流程:CAS操作state欄位,將值從0改為1,若成功表示鎖未被佔用,返回true;若失敗,則返回false。如果是重入鎖,state欄位值會累積,表示重入次數。
直接入隊:addWaiter。在acquire模版方法中,如果鉤子方法tryAcquire返回失敗,就構造同步節點(獨佔式節點模式為Node.EXCLUSIVE),通過addWaiter方法將節點加入同步佇列的隊尾。
自選入隊:enq。addWaiter第一次嘗試在尾部新增節點失敗,意味有併發搶鎖發生,需要自旋。enq方法通過CAS自旋將節點新增到佇列尾部。
自旋搶佔:acquireQueued。節點入隊之後,啟動自旋鎖的流程,acquireQueued的主要邏輯:當前Node節點執行緒在死迴圈中不斷獲取同步狀態,並且在前驅結點上自旋,只有當前驅結點是頭結點時才嘗試獲取鎖。為了不浪費資源,如果頭結點獲取了鎖,那麼該節點會終止自旋,執行緒回去執行臨界區的程式碼。其餘處於自旋狀態的執行緒當然也不會自旋浪費資源,而是被掛起進入阻塞狀態。
Re
《Java高併發核心程式設計》
雖然講原始碼了,但是感覺有些囉嗦,沒有重點,筆記做下來感覺太注重細節,沒有總結和綱領,看下《Java併發程式設計之美》試試。