1. 程式人生 > >原始碼分析:Semaphore之訊號量

原始碼分析:Semaphore之訊號量

## 簡介 Semaphore 又名計數訊號量,從概念上來講,訊號量初始並維護一定數量的許可證,使用之前先要先獲得一個許可,用完之後再釋放一個許可。訊號量通常用於限制執行緒的數量來控制訪問某些資源,從而達到單機限流的目的,比如SpringCloud 中的Zuul 元件用的是 Hystrix 的訊號量(semaphore)隔離模式。 ## 原始碼分析 ### 重要的內部類 Semaphore 和 ReentrantLock 內部類完全相似, 有3個重要的內部類,分別也是 `Sync`、`NonfairSync`和`FairSync`; 1. Sync 是後面兩個的父類,繼承至AbstractQueuedSynchronizer(AQS) 2. NonfairSync和FairSync都繼承至Sync 3. NonfairSync 主要用於實現非公平鎖,FairSync 主要用於實現公平鎖 如果你看了前面幾天關於鎖的原始碼分析,是不是發現它們的套路都差不多呢? ### **重要的屬性** 和 ReentrantLock 也完全一樣,只有一個重要的屬性,同步器sync: ```java private final Sync sync; ``` ### 兩個構造方法 ```java // ①指定初始許可證數量 public Semaphore(int permits) { sync = new NonfairSync(permits); } // ②指定初始許可證數量和公平模式 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } ``` 兩個構造方法最後都是初始化許可證數量,呼叫的也就是同步器裡面的構造方法來初始化AQS 裡面的state欄位 ```java // Sync 的構造方法 Sync(int permits) { setState(permits); } // AQS 中的程式碼 protected final void setState(int newState) { state = newState; } ``` ### 獲取許可:acquire() 預設每次獲得1個許可,如果沒有可用的許可證會阻塞執行緒,或者被中斷丟擲異常。 原始碼分析: ```java public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); // 預設每次獲得1個許可 } ``` `acquireSharedInterruptibly(1)`會呼叫 AQS 裡面的方法: ```java public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 執行緒被中斷,丟擲異常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // tryAcquireShared 嘗試獲得許可,返回小於0 表示沒有獲得許可 doAcquireSharedInterruptibly(arg); // 沒有獲得許可,排隊阻塞 } ``` `tryAcquireShared(arg)`方法: tryAcquireShared 有兩種實現,也就是 FairSync(公平模式) 和 NonfairSync(非公平模式) 不同實現。 1. 公平模式的實現程式碼 `FairSync.tryAcquireShared`: ```java // acquires protected int tryAcquireShared(int acquires) { for (;;) { // 自旋 if (hasQueuedPredecessors()) // 檢查是否有更早的執行緒在排隊獲得許可 return -1; // 有排隊的執行緒,返回-1,小於0表示獲得許可失敗 int available = getState(); // 獲得可用許可數 int remaining = available - acquires; // 減去一個許可,計算剩餘的許可數 if (remaining < 0 || compareAndSetState(available, remaining)) // remaining < 0 成立的話就說明獲取許可失敗了,出去也要排隊阻塞執行緒 return remaining; } } ``` 2. 非公平模式的實現程式碼NonfairSync.tryAcquireShared: ```java protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); // 呼叫父類Sync裡面的實現方法 } // 父類Sync裡面的實現方法 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 自旋 int available = getState(); // 獲得可用許可數 int remaining = available - acquires; // 減去一個許可,計算剩餘的許可數 if (remaining < 0 || compareAndSetState(available, remaining)) // remaining < 0 成立的話就說明獲取許可失敗了,出去也要排隊阻塞執行緒 return remaining; } } ``` 有沒有發現他們的程式碼非常相識?公平模式的實現就只是比非公平模式多了一個`hasQueuedPredecessors()` 方法呼叫判斷,這個方法主要就是檢查排隊的佇列裡面是不是還有其他執行緒。在之前分析ReentrantLock 原始碼的文章中也有提到。 如果`tryAcquireShared` 方法沒有獲得許可(返回值小於0),就會進入到AQS 的 doAcquireSharedInterruptibly 方法: ```java private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 為當前執行緒建立排隊節點,並加入到佇列 // addWaiter方法的分析在之前的AQS分析文章已經分析過了 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 自旋,嘗試獲得許可,阻塞執行緒,喚醒後繼續獲得許可 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); // 嘗試獲得許可 if (r >= 0) { // 獲得許可 setHeadAndPropagate(node, r); // 設定排隊的頭節點 p.next = null; // help GC failed = false; return; // 執行緒獲得許可,退出 } } // shouldParkAfterFailedAcquire 如果執行緒應阻塞,則返回true // 之前的AQS分析文章已經分析過了 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 被中斷了,丟擲異常 throw new InterruptedException(); } } finally { if (failed) // 節點被取消 cancelAcquire(node); } } ``` **獲得許可總結:** 1. 獲得許可就是對初始化的許可證進行減1,直到沒有許可證了就會進入到佇列排隊阻塞執行緒 2. 公平模式下,會去看排隊的佇列是否有更早的執行緒在排隊獲取 3. 非公平模式下,不會去檢查排隊佇列 ### 釋放許可:acquire() 預設釋放一個許可 ```java public void release() { sync.releaseShared(1); // 釋放一個許可 } ``` 呼叫的還是AQS框架裡面的程式碼實現: ```java public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // tryReleaseShared 是訊號量自己實現的 doReleaseShared(); return true; } return false; } ``` tryReleaseShared 方法實現: 說明一下,這個釋放許可的實現,公平模式和非公平模式都是呼叫的同一個實現。 ```java protected final boolean tryReleaseShared(int releases) { for (;;) { // 自旋 int current = getState(); //當前可用的許可 int next = current + releases; // 加上釋放的許可 if (next < current) // 以防你傳個負的樹過來釋放 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) // CAS 修改,成功就是釋放成功,失敗的話繼續自旋 return true; } } ``` **釋放許可總結:** 1. 釋放許可就是把開始獲得的許可還回去 2. 用到CAS來修改許可證數,用自旋來保證一定會還回去(直到還成功為止) ### 其他API方法 Semaphore 還有其他的很多API可以呼叫,但其實原始碼都差不多,所以這裡就不繼續分析了,如果你把我之前分析AQS、ReentrantLock、ReentrantReadWriteLock的原始碼文章也看了,你就會發現這個Semaphore 的原始碼讀起來非常簡單了,這裡再簡單說下其他API的作用。 1. void acquire(int permits) 和上面分析的acquire()功能一樣,只不過你可以指定獲取許可數,原始碼在減的時候就不是減1了,在釋放的時候也要注意,最好保持一致。 被中斷會丟擲異常 2. void acquireUninterruptibly() Uninterruptibly(),和 acquire() 方法的唯一區別就是執行緒被中斷了也不會丟擲異常,其他完全一致 3. void acquireUninterruptibly(int permits) 被中斷不丟擲異常,指定每次獲取許可的數量 4. boolean tryAcquire() 只會嘗試一次獲得許可,獲得成功了就返回true,失敗了不會去排隊阻塞執行緒。 還有幾個帶引數的,意思都差不多。 5. int availablePermits() 返回可用的許可數 6. void release(int permits) 一次釋放指定的許可數 ## Semaphore 總結 1. Semaphore 也是基於AQS框架來實現的 2. Semaphore 也有公平和非公平之說,公平就是在獲取許可之前會先看一下佇列是否有其他執行緒在排隊 3. Semaphore 的初始訊號量必須指定,如果是1的話,功能就相當於一個互斥鎖了 4. Semaphore 支援重入獲得許可,但是這裡要注意的是,如果一個執行緒先獲得了許可,沒釋放又來獲得許可,這時候許可數不足的情況下,當前執行緒會被阻塞,有可能會死鎖。 5. 如果這篇文章沒看懂,可以先去看看的之前關於AQS(AQS分析文章裡面有一個自己實現的共享鎖,和這裡的訊號量非常相似)、ReentrantLock和RRWLock原始碼分析的文章,所有文章看完,保證你一懂百懂,奧