1. 程式人生 > 實用技巧 >Java AQS學習筆記

Java AQS學習筆記

1. AQS介紹

AQS的全稱為(AbstractQueuedSynchronizer),這個類在java.util.concurrent.locks包下面。

AQS是一個用來構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用廣泛的大量的同步器,比如我們提到的ReentrantLock,Semaphore,其他的諸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基於AQS的。當然,我們自己也能利用AQS非常輕鬆容易地構造出符合我們自己需求的同步器。

2. AQS原理

AQS核心思想是,如果被請求的共享資源空閒,則將當前請求資源的執行緒設定為有效的工作執行緒,並且將共享資源設定為鎖定狀態。如果被請求的共享資源被佔用,那麼就需要一套執行緒阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH佇列鎖實現的,即將暫時獲取不到鎖的執行緒加入到佇列中。

CLH(Craig,Landin,and Hagersten) 佇列是一個虛擬的雙向佇列(虛擬的雙向佇列即不存在佇列例項,僅存在結點之間的關聯關係)。AQS是將每條請求共享資源的執行緒封裝成一個CLH鎖佇列的一個結點(Node)來實現鎖的分配。

看個AQS(AbstractQueuedSynchronizer)原理圖:

AQS使用一個int成員變數來表示同步狀態,通過內建的FIFO佇列來完成獲取資源執行緒的排隊工作。AQS使用CAS對該同步狀態進行原子操作實現對其值的修改。

private volatile int state;//共享變數,使用volatile修飾保證執行緒可見性

狀態資訊通過procted型別的getState,setState,compareAndSetState進行操作

//返回同步狀態的當前值
protected final int getState() {  
        return state;
}
 // 設定同步狀態的值
protected final void setState(int newState) { 
        state = newState;
}
//原子地(CAS操作)將同步狀態值設定為給定值update如果當前同步狀態的值等於expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

2.1. AQS 對資源的共享方式

  • 獨佔(Exclusive):只有一個執行緒能執行,如ReentrantLock。又可分為公平鎖和非公平鎖:
    • 公平鎖(FairSync):按照執行緒在佇列中的排隊順序,先到者先拿到鎖
    • 非公平鎖(NonfairSync):當執行緒要獲取鎖時,無視佇列順序直接去搶鎖,誰搶到就是誰的
  • 共享(Share):多個執行緒可同時執行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我們都會在後面講到。

ReentrantReadWriteLock 可以看成是組合式,因為ReentrantReadWriteLock也就是讀寫鎖允許多個執行緒同時對某一資源進行讀。

不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源 state 的獲取與釋放方式即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在上層已經幫我們實現好了。

2.2. AQS底層使用了模板方法模式

同步器的設計是基於模板方法模式的,如果需要自定義同步器一般的方式是這樣(模板方法模式很經典的一個應用):

  1. 使用者繼承AbstractQueuedSynchronizer並重寫指定的方法。(這些重寫方法很簡單,無非是對於共享資源state的獲取和釋放)
  2. 將AQS組合在自定義同步元件的實現中,並呼叫其模板方法,而這些模板方法會呼叫使用者重寫的方法。

這和我們以往通過實現介面的方式有很大區別,這是模板方法模式很經典的一個運用,下面簡單的給大家介紹一下模板方法模式,模板方法模式是一個很容易理解的設計模式之一。

模板方法模式是基於”繼承“的,主要是為了在不改變模板結構的前提下在子類中重新定義模板中的內容以實現複用程式碼。舉個很簡單的例子假如我們要去一個地方的步驟是:購票buyTicket()->安檢securityCheck()->乘坐某某工具回家ride()->到達目的地arrive()。我們可能乘坐不同的交通工具回家比如飛機或者火車,所以除了ride()方法,其他方法的實現幾乎相同。我們可以定義一個包含了這些方法的抽象類,然後使用者根據自己的需要繼承該抽象類然後修改 ride()方法。

AQS使用了模板方法模式,自定義同步器時需要重寫下面幾個AQS提供的模板方法:
isHeldExclusively()//該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。
tryAcquire(int)//獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int)//獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared(int)//共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
tryReleaseShared(int)//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。

預設情況下,每個方法都丟擲 UnsupportedOperationException。 這些方法的實現必須是內部執行緒安全的,並且通常應該簡短而不是阻塞。AQS類中的其他方法都是final ,所以無法被其他類使用,只有這幾個方法可以被其他類使用。

  • 以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A執行緒lock()時,會呼叫tryAcquire()獨佔該鎖並將state+1。此後,其他執行緒再tryAcquire()時就會失敗,直到A執行緒unlock()到state=0(即釋放鎖)為止,其它執行緒才有機會獲取該鎖。當然,釋放鎖之前,A執行緒自己是可以重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。

  • 再以CountDownLatch以例,任務分為N個子執行緒去執行,state也初始化為N(注意N要與執行緒個數一致)。這N個子執行緒是並行執行的,每個子執行緒執行完後countDown()一次,state會CAS(Compare and Swap)減1。等到所有子執行緒都執行完後(即state=0),會unpark()主呼叫執行緒,然後主呼叫執行緒就會從await()函式返回,繼續後餘動作。

一般來說,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支援自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。

3. CountDownLatch (倒計時器)

CountDownLatch是一個同步工具類,用來協調多個執行緒之間的同步。這個工具通常用來控制執行緒等待,它可以讓某一個執行緒等待直到倒計時結束,再開始執行。

3.1. CountDownLatch 的兩種典型用法

  1. 某一執行緒在開始執行前等待n個執行緒執行完畢。將 CountDownLatch 的計數器初始化為n :new CountDownLatch(n) ,每當一個任務執行緒執行完畢,就將計數器減1 countdownlatch.countDown(),當計數器的值變為0時,在CountDownLatch上 await() 的執行緒就會被喚醒。

一個典型應用場景就是啟動一個服務時,主執行緒需要等待多個元件載入完畢,之後再繼續執行。

  1. 實現多個執行緒開始執行任務的最大並行性。注意是並行性,不是併發,強調的是多個執行緒在某一時刻同時開始執行。類似於賽跑,將多個執行緒放到起點,等待發令槍響,然後同時開跑。做法是初始化一個共享的 CountDownLatch 物件,將其計數器初始化為 1 :new CountDownLatch(1) ,多個執行緒在開始執行任務前首先 coundownlatch.await(),當主執行緒呼叫 countDown() 時,計數器變為0,多個執行緒同時被喚醒。

3.2. CountDownLatch 的使用示例

public class CountDownLatchDemo {
    // 請求的數量
    private static final int threadCount = 50;

    public static void main(String[] args) throws InterruptedException {
        // 建立一個具有固定執行緒數量的執行緒池物件(如果這裡執行緒池的執行緒數量給太少的話你會發現執行的很慢)
        // 這裡需要注意:阿里編碼規範中不建議使用Executors建立執行緒池,感興趣的小夥伴自行Google去,此處為了直觀就暫時不遵循,見諒
        ExecutorService threadPool = Executors.newFixedThreadPool(30);
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i + 1;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(new Random().nextInt(1000));// 模擬請求的耗時操作
                    System.out.println("threadnum:" + threadnum);
                    Thread.sleep(new Random().nextInt(1000));// 模擬請求的耗時操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();// 表示一個請求已經被完成
                }
            });
        }
        countDownLatch.await(); // 此處會阻塞主執行緒
        System.out.println("finished ...");
        threadPool.shutdown(); // 記得關閉執行緒池,不然main方法不會自動退出
    }
}

上面的程式碼中,我們定義了請求的數量為50,當這50個請求被處理完成之後,才會執行
System.out.println("finished ...");

3.3. CountDownLatch 的不足

CountDownLatch是一次性的,計數器的值只能在構造方法中初始化一次,之後沒有任何機制再次對其設定值,當CountDownLatch使用完畢後,它不能再次被使用。

4. CyclicBarrier(迴圈柵欄)

CyclicBarrier 和 CountDownLatch 非常類似,它也可以實現執行緒間的技術等待,但是它的功能比 CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 類似。

CyclicBarrier 的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。CyclicBarrier預設的構造方法是 CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴 CyclicBarrier 我已經到達了屏障,然後當前執行緒被阻塞。

4.1. CyclicBarrier 的應用場景

舉個例子:
我們需要召集7個法師去尋找龍珠,在出發前需要集合開個會(第一個屏障),然後各自出發去尋找龍珠,找到龍珠後需要在一起(第二個屏障)召喚神龍;

4.2. CyclicBarrier 的使用示例

public class CyclicBarrierDemo {

    private static final int MAX_THREAD_NUM = 7;
    private static int mission = 1; // 屏障 {1: 召集7位法師, 2: 集齊7顆龍珠召喚神龍}

    public static void main(String[] args) {
        // 建立執行緒池
        ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_NUM);
        // 兩個任務使用同一個屏障
        CyclicBarrier cyclicBarrier = new CyclicBarrier(MAX_THREAD_NUM, ()->{
            if(mission == 1){
                // 屏障1: 召集法師任務已完成
                System.out.println("已經召集齊7名法師,任務已宣讀,出發尋找龍珠");
                mission = 2; // 任務1已完成,設定任務2:尋找龍珠
            }else{
                // 屏障2: 集齊龍珠已完成
                System.out.println("龍珠已集齊,開始召喚神龍");
            }
        });
        for (int i = 0; i < MAX_THREAD_NUM; i++) {
            final int num = i + 1;
            threadPool.execute(()->{
                try {
                    Thread.sleep(new Random().nextInt(1000)); // 模擬召集過程
                    System.out.println("第" + num + "名法師已就位");
                    cyclicBarrier.await(); // 屏障1阻塞執行緒
                    // 必須等7為法師全部召集後出發尋找龍珠
                    System.out.println("第" + num + "名法師已出發");
                    Thread.sleep(new Random().nextInt(1000)); // 模擬尋找龍珠過程
                    System.out.println("第" + num + "顆龍珠已找到");
                    cyclicBarrier.await(); // 屏障2阻塞執行緒
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
執行結果,如下:
第5名法師已就位
第6名法師已就位
第3名法師已就位
第1名法師已就位
第7名法師已就位
第4名法師已就位
第2名法師已就位
已經召集齊7名法師,任務已宣讀,出發尋找龍珠
第2名法師已出發
第5名法師已出發
第3名法師已出發
第4名法師已出發
第6名法師已出發
第7名法師已出發
第1名法師已出發
第5顆龍珠已找到
第3顆龍珠已找到
第7顆龍珠已找到
第2顆龍珠已找到
第1顆龍珠已找到
第4顆龍珠已找到
第6顆龍珠已找到
龍珠已集齊,開始召喚神龍

可以看到當執行緒數量也就是請求數量達到我們定義的 7 個的時候, await方法之後的方法才被執行。

4.3. CyclicBarrier和CountDownLatch的區別

CountDownLatch是計數器,只能使用一次,而CyclicBarrier的計數器提供reset功能,可以多次使用。但是我不那麼認為它們之間的區別僅僅就是這麼簡單的一點。我們來從jdk作者設計的目的來看,javadoc是這麼描述它們的:

  • CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一個或者多個執行緒,等待其他多個執行緒完成某件事情之後才能執行;)
  • CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多個執行緒互相等待,直到到達同一個同步點,再繼續一起執行。)

對於CountDownLatch來說,重點是“一個執行緒(多個執行緒)等待”,而其他的N個執行緒在完成“某件事情”之後,可以終止,也可以等待。而對於CyclicBarrier,重點是多個執行緒,在任意一個執行緒沒有完成,所有的執行緒都必須等待。

CountDownLatch是計數器,執行緒完成一個記錄一個,只不過計數是遞減,而CyclicBarrier更像是一個閥門,需要所有執行緒都到達,閥門才能開啟,然後繼續執行。

CountDownLatch CyclicBarrier
減計數方式 加計數方式
計數器為0時釋放所有等待的執行緒 計數器達到指定值時釋放所有等待執行緒
計數器為0,無法重置 計數器達到指定值時,重置為0
呼叫countDown方法計數器減1,呼叫await方法只進行阻塞,對計數器沒有影響 呼叫awati方法計數器加1,若加1後值不等於構造方法初始化的值,執行緒阻塞
不可重複利用 可重複利用

5. Semaphore(訊號量)-允許多個執行緒同時訪問

synchronized 和 ReentrantLock 都是一次只允許一個執行緒訪問某個資源,Semaphore(訊號量)可以指定多個執行緒同時訪問某個資源。示例程式碼如下:

public class SemaphoreDemo {
    // 請求的數量
    private static final int threadCount = 50;

    public static void main(String[] args) throws InterruptedException {
        // 建立一個具有固定執行緒數量的執行緒池
        ExecutorService threadPool = Executors.newFixedThreadPool(30);
        // 一次只能允許執行的執行緒數量。
        final Semaphore semaphore = new Semaphore(20);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            threadPool.execute(() -> {
                try {
                    semaphore.acquire();// 獲取一個許可,所以可執行執行緒數量為20/1=20
                    Thread.sleep(new Random().nextInt(1000));// 模擬請求的耗時操作
                    System.out.println("threadNum:" + threadNum);
                    Thread.sleep(new Random().nextInt(1000));// 模擬請求的耗時操作
                    semaphore.release();// 釋放一個許可
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        System.out.println("finished ...");
        threadPool.shutdown();
    }
}

執行 acquire 方法阻塞,直到有一個許可證可以獲得然後拿走一個許可證;每個 release 方法增加一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並沒有實際的許可證這個物件,Semaphore只是維持了一個可獲得許可證的數量。 Semaphore經常用於限制獲取某種資源的執行緒數量。

當然一次也可以一次拿取和釋放多個許可,不過一般沒有必要這樣做:
semaphore.acquire(5);// 獲取5個許可,所以可執行執行緒數量為20/5=4
test(threadnum);
semaphore.release(5);// 獲取5個許可,所以可執行執行緒數量為20/5=4

除了 acquire方法之外,另一個比較常用的與之對應的方法是tryAcquire方法,該方法如果獲取不到許可就立即返回false。

Semaphore 有兩種模式,公平模式和非公平模式。

  • 公平模式: 呼叫acquire的順序就是獲取許可證的順序,遵循FIFO;
  • 非公平模式: 搶佔式的。
    Semaphore 對應的兩個構造方法如下:
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

這兩個構造方法,都必須提供許可的數量,第二個構造方法可以指定是公平模式還是非公平模式,預設非公平模式。

參考資料:
https://www.cnblogs.com/waterystone/p/4920797.html

https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html

https://blog.csdn.net/qq_19431333/article/details/70212663

https://blog.csdn.net/u010185262/article/details/54692886

https://blog.csdn.net/tolcf/article/details/50925145?utm_source=blogxgwz0