1. 程式人生 > 其它 >AQS抽象同步器的核心原理與實踐

AQS抽象同步器的核心原理與實踐

基於CAS自旋實現的輕量級鎖有兩個問題:

(1)CAS空自旋會浪費大量CPU資源。

(2)在CMP架構的CPU會導致“匯流排風暴”。

解決CAS空自旋的有效方式之一是以空間換時間,比較常見的方案由兩種:分散操作和熱點和使用佇列削峰。JUC使用的是佇列削峰的方案解決CAS效能問題(LongAdder是分散熱點),它提供了一個雙向佇列的削峰基類——抽象基礎類AbstractQueuedSynchronizer(AQS)。

鎖與佇列的關係

1.CLH鎖的內部佇列

CLH自旋鎖使用的CLH是一個單向佇列,也是一個FIFO佇列。在獨佔鎖中,競爭資源在一個時間點只能被一個執行緒鎖訪問,佇列頭部的節點表示佔有鎖的節點,新加入的搶鎖執行緒需要等待,會插入佇列的尾部。

2.分散式鎖的內部佇列

在分散式鎖的實現中,以ZooKeeper的分散式鎖為例,就是建立臨時節點,順序執行。

3.AQS的內部佇列

AQS是JUC提供的一個用於構建鎖和同步容器的基礎類。例如ReentrantLock、Semaphore、CountDownLatch、FutureTask等都是基於AQS構建的。AQS解決了實現同步容器時設計的大量細節問題。

AQS是CLH佇列的一個變種。AQS佇列內部維護的是一個FIFO的雙向連結串列,每個節點有前驅結點和後繼節點。每個節點由執行緒封裝,當執行緒爭搶鎖失敗後會封裝成節點加入AQS佇列中;當獲取鎖的執行緒釋放鎖以後,會從佇列中喚醒一個阻塞的節點(執行緒)。

AQS的核心成員

AQS出於“分離變與不變”(人話:單一職責和開閉原則)的原則,基於模版模式實現。AQS為鎖獲取、鎖釋放的排隊和出隊過程提供了一系列的模版方法。由於JUC的顯式鎖種類豐富,因此AQS將不同鎖的具體操作抽取為鉤子方法,讓各種鎖的子類去實現。

狀態標誌位

AQS中維持了一個單一的volatile變數state,state表示鎖的狀態。

private volatile int state;

state保證了可見性,所以任何執行緒通過getState()獲取狀態都可以得到最新值。AQS提供了compareAndSetState()方法利用底層UnSafe的CAS機制來實現原子性。

protected final boolean compareAndSetState(int exepect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

以ReentrantLock為例,state初始化為0,表示未鎖定。A執行緒執行該鎖的lock()操作時,會呼叫tryAcquire獨佔該鎖並將state加1。此後,其他執行緒再tryAcquire()時就會失敗,直到A執行緒unlock()到state=0為止,其他執行緒才有機會獲取該鎖。當然,釋放鎖之前,A執行緒自己是可以重複獲取鎖的(state會累加),這就是可重入。但是,獲取多少次就要釋放多少次,這樣才能保證state回到零態。

AbstractQueuedSynchronizer繼承了AbstractOwnableSynchronizer,這個基類只有一個變數exclusiveOwnerThread,表示當前佔用該鎖的執行緒,並且提供了get和set方法。

佇列節點類

Node

FIFO雙向同步佇列

每當執行緒通過AQS獲取鎖失敗時,執行緒將被封裝成一個Node節點,通過CAS原子操作插入佇列尾部。當有執行緒釋放鎖時,AQS會嘗試讓隊頭的後繼節點佔用鎖。

JUC顯式鎖與AQS的關係

AQS是一個同步器,它實現了鎖的基本抽象功能,該類是由模版模式來實現的。

1.ReentrantLock與AQS的組合關係

ReentrantLock是一個可重入的互斥鎖,可以被單個執行緒多次獲取。ReentrantLock把所有Lock介面的操作都委派到一個Sync類上,該類繼承了AbstractQueuedSynchronizer:

static abstract class Sync extends AbstractQueuedSynchronizer { ... }

ReentrantLock支援公平鎖和非公平鎖。預設情況下是非公平鎖。

final static class NonfairSync extends Sync { ... }
final static class FairSync extends Sync { ... }

由ReentrantLock的lock和unlock的原始碼可以看到,它們只是分別呼叫了sync物件的lock和release方法。

public void lock() {
        sync.acquire(1);
}

public void unlock() {
    sync.release(1);
}

而Sync內部類只是AQS的子類,所以本質是ReentrantLock的操作是委託給AQS完成的。

AQS的模版流程

AQS定義了兩種資源共享方式:

  • Exclusive(獨享鎖):只有一個執行緒能佔有鎖資源,如ReentrantLock。
  • share(共享鎖):多個執行緒可以同時佔有資源,如Semaphore、CountDownLatch。

AQS為不同的資源共享方式提供了不同的模版流程,AQS提供了一種實現阻塞鎖和依賴FIFO等待佇列的同步器的框架。自定義的同步器只需要實現共享資源state的獲取與釋放方式即可,這些邏輯都編寫在鉤子方法中。無論是共享鎖還是獨佔鎖,AQS在執行模版流程時都會回撥自定義的鉤子方法。

AQS的鉤子方法

自定義同步器時,AQS中需要重寫的鉤子方法如下:

  • tryAcquire(int):獨佔鎖鉤子,嘗試獲取資源,若成功則返回true,若失敗則返回false。
  • tryRekease(int):獨佔鎖鉤子,嘗試釋放資源,若成功則返回true,若失敗則返回false。
  • tryAcquireShared(int):共享鎖鉤子,嘗試獲取資源,負數表示失敗;
  • isHeldExclusively():獨佔鎖鉤子,判斷該執行緒是否正在獨佔資源。只有用到condition條件佇列時才需要去實現它。

通過AQS實現簡單的獨佔鎖

SimpleMockLock只實現了Lock介面的兩個方法:

(1)lock方法:完成顯式鎖的搶佔。

(2)unlock方法:完成顯式鎖的釋放。

SimpleMockLock的鎖搶佔和釋放是委託給Sync例項的方法來實現的。在搶佔鎖時,AQS的acquire會呼叫tryAcquire鉤子方法;釋放鎖時,AQS的release會呼叫tryRelease鉤子方法。

內部類Sync繼承AQS類時提供了一下兩個鉤子方法的實現:

(1)tryAcquire:將state設定為1並儲存當前執行緒,表示互斥鎖已經佔用。

(2)tryRelease:將state設定為0,表示互斥鎖已經被釋放。

public class SimpleMockLock implements Lock {

    // 同步器例項
    private final Sync sync = new Sync();

    // 自定義的內部類:同步器
    // 直接使用 state 表示鎖的狀態
    // state = 0 表示鎖沒有被佔用
    // state = 1 表示已經被佔用
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            // 接下來不需要使用CAS操作,因為下面的操作不存在併發場景
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    static int i = 0;

    public static void lockAndFastIncrease(Lock lock) {
        lock.lock();
        i++;
        System.out.println(i);
        lock.unlock();
    }

    public static void main(String[] args) {
        LongAdder cnt = new LongAdder();
        final int TURNS = 1000;
        final int THREADS = 10;
        final ExecutorService pool = Executors.newFixedThreadPool(THREADS);
        final SimpleMockLock lock = new SimpleMockLock();
        long start = System.currentTimeMillis();
        for (int i = 0; i < THREADS; i++) {
            pool.submit(() -> {
                try {
                    for (int j = 0; j < TURNS; j++) {
                        lockAndFastIncrease(lock);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        final long l = System.currentTimeMillis() - start;
        System.out.println("耗時:" + l);
        pool.shutdown();
    }

}

AQS鎖搶佔原理

流程的第一步,顯式鎖的lock方法會呼叫同步器基類AQS的模版方法acquire。acquire是AQS封裝好的獲取資源的公共入口,它是AQS提供的利用獨佔的方式獲取資源的方法,原始碼如下

public final void acquire(int arg) {
    if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire至少執行一次tryAcquire鉤子方法,tryAcquire預設丟擲一個異常,具體的獲取獨佔資源state的邏輯需要鉤子方法來實現。若呼叫tryAcquire嘗試成功,則acquire將直接返回,表示搶到鎖;若不成功,則將執行緒加入等待佇列中。

tryAcquire流程:CAS操作state欄位,將值從0改為1,若成功表示鎖未被佔用,返回true;若失敗,則返回false。如果是重入鎖,state欄位值會累積,表示重入次數。

直接入隊:addWaiter。在acquire模版方法中,如果鉤子方法tryAcquire返回失敗,就構造同步節點(獨佔式節點模式為Node.EXCLUSIVE),通過addWaiter方法將節點加入同步佇列的隊尾。

自選入隊:enq。addWaiter第一次嘗試在尾部新增節點失敗,意味有併發搶鎖發生,需要自旋。enq方法通過CAS自旋將節點新增到佇列尾部。

自旋搶佔:acquireQueued。節點入隊之後,啟動自旋鎖的流程,acquireQueued的主要邏輯:當前Node節點執行緒在死迴圈中不斷獲取同步狀態,並且在前驅結點上自旋,只有當前驅結點是頭結點時才嘗試獲取鎖。為了不浪費資源,如果頭結點獲取了鎖,那麼該節點會終止自旋,執行緒回去執行臨界區的程式碼。其餘處於自旋狀態的執行緒當然也不會自旋浪費資源,而是被掛起進入阻塞狀態。

Re

《Java高併發核心程式設計》

雖然講原始碼了,但是感覺有些囉嗦,沒有重點,筆記做下來感覺太注重細節,沒有總結和綱領,看下《Java併發程式設計之美》試試。