原始碼分析:Semaphore之訊號量
阿新 • • 發佈:2020-11-21
## 簡介
Semaphore 又名計數訊號量,從概念上來講,訊號量初始並維護一定數量的許可證,使用之前先要先獲得一個許可,用完之後再釋放一個許可。訊號量通常用於限制執行緒的數量來控制訪問某些資源,從而達到單機限流的目的,比如SpringCloud 中的Zuul 元件用的是 Hystrix 的訊號量(semaphore)隔離模式。
## 原始碼分析
### 重要的內部類
Semaphore 和 ReentrantLock 內部類完全相似, 有3個重要的內部類,分別也是 `Sync`、`NonfairSync`和`FairSync`;
1. Sync 是後面兩個的父類,繼承至AbstractQueuedSynchronizer(AQS)
2. NonfairSync和FairSync都繼承至Sync
3. NonfairSync 主要用於實現非公平鎖,FairSync 主要用於實現公平鎖
如果你看了前面幾天關於鎖的原始碼分析,是不是發現它們的套路都差不多呢?
### **重要的屬性**
和 ReentrantLock 也完全一樣,只有一個重要的屬性,同步器sync:
```java
private final Sync sync;
```
### 兩個構造方法
```java
// ①指定初始許可證數量
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// ②指定初始許可證數量和公平模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
```
兩個構造方法最後都是初始化許可證數量,呼叫的也就是同步器裡面的構造方法來初始化AQS 裡面的state欄位
```java
// Sync 的構造方法
Sync(int permits) {
setState(permits);
}
// AQS 中的程式碼
protected final void setState(int newState) {
state = newState;
}
```
### 獲取許可:acquire()
預設每次獲得1個許可,如果沒有可用的許可證會阻塞執行緒,或者被中斷丟擲異常。
原始碼分析:
```java
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1); // 預設每次獲得1個許可
}
```
`acquireSharedInterruptibly(1)`會呼叫 AQS 裡面的方法:
```java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 執行緒被中斷,丟擲異常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // tryAcquireShared 嘗試獲得許可,返回小於0 表示沒有獲得許可
doAcquireSharedInterruptibly(arg); // 沒有獲得許可,排隊阻塞
}
```
`tryAcquireShared(arg)`方法:
tryAcquireShared 有兩種實現,也就是 FairSync(公平模式) 和 NonfairSync(非公平模式) 不同實現。
1. 公平模式的實現程式碼 `FairSync.tryAcquireShared`:
```java
// acquires
protected int tryAcquireShared(int acquires) {
for (;;) { // 自旋
if (hasQueuedPredecessors()) // 檢查是否有更早的執行緒在排隊獲得許可
return -1; // 有排隊的執行緒,返回-1,小於0表示獲得許可失敗
int available = getState(); // 獲得可用許可數
int remaining = available - acquires; // 減去一個許可,計算剩餘的許可數
if (remaining < 0 || compareAndSetState(available, remaining))
// remaining < 0 成立的話就說明獲取許可失敗了,出去也要排隊阻塞執行緒
return remaining;
}
}
```
2. 非公平模式的實現程式碼NonfairSync.tryAcquireShared:
```java
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires); // 呼叫父類Sync裡面的實現方法
}
// 父類Sync裡面的實現方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 自旋
int available = getState(); // 獲得可用許可數
int remaining = available - acquires; // 減去一個許可,計算剩餘的許可數
if (remaining < 0 || compareAndSetState(available, remaining))
// remaining < 0 成立的話就說明獲取許可失敗了,出去也要排隊阻塞執行緒
return remaining;
}
}
```
有沒有發現他們的程式碼非常相識?公平模式的實現就只是比非公平模式多了一個`hasQueuedPredecessors()` 方法呼叫判斷,這個方法主要就是檢查排隊的佇列裡面是不是還有其他執行緒。在之前分析ReentrantLock 原始碼的文章中也有提到。
如果`tryAcquireShared` 方法沒有獲得許可(返回值小於0),就會進入到AQS 的 doAcquireSharedInterruptibly 方法:
```java
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 為當前執行緒建立排隊節點,並加入到佇列
// addWaiter方法的分析在之前的AQS分析文章已經分析過了
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) { // 自旋,嘗試獲得許可,阻塞執行緒,喚醒後繼續獲得許可
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); // 嘗試獲得許可
if (r >= 0) { // 獲得許可
setHeadAndPropagate(node, r); // 設定排隊的頭節點
p.next = null; // help GC
failed = false;
return; // 執行緒獲得許可,退出
}
}
// shouldParkAfterFailedAcquire 如果執行緒應阻塞,則返回true
// 之前的AQS分析文章已經分析過了
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 被中斷了,丟擲異常
throw new InterruptedException();
}
} finally {
if (failed) // 節點被取消
cancelAcquire(node);
}
}
```
**獲得許可總結:**
1. 獲得許可就是對初始化的許可證進行減1,直到沒有許可證了就會進入到佇列排隊阻塞執行緒
2. 公平模式下,會去看排隊的佇列是否有更早的執行緒在排隊獲取
3. 非公平模式下,不會去檢查排隊佇列
### 釋放許可:acquire()
預設釋放一個許可
```java
public void release() {
sync.releaseShared(1); // 釋放一個許可
}
```
呼叫的還是AQS框架裡面的程式碼實現:
```java
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // tryReleaseShared 是訊號量自己實現的
doReleaseShared();
return true;
}
return false;
}
```
tryReleaseShared 方法實現:
說明一下,這個釋放許可的實現,公平模式和非公平模式都是呼叫的同一個實現。
```java
protected final boolean tryReleaseShared(int releases) {
for (;;) { // 自旋
int current = getState(); //當前可用的許可
int next = current + releases; // 加上釋放的許可
if (next < current) // 以防你傳個負的樹過來釋放
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // CAS 修改,成功就是釋放成功,失敗的話繼續自旋
return true;
}
}
```
**釋放許可總結:**
1. 釋放許可就是把開始獲得的許可還回去
2. 用到CAS來修改許可證數,用自旋來保證一定會還回去(直到還成功為止)
### 其他API方法
Semaphore 還有其他的很多API可以呼叫,但其實原始碼都差不多,所以這裡就不繼續分析了,如果你把我之前分析AQS、ReentrantLock、ReentrantReadWriteLock的原始碼文章也看了,你就會發現這個Semaphore 的原始碼讀起來非常簡單了,這裡再簡單說下其他API的作用。
1. void acquire(int permits)
和上面分析的acquire()功能一樣,只不過你可以指定獲取許可數,原始碼在減的時候就不是減1了,在釋放的時候也要注意,最好保持一致。
被中斷會丟擲異常
2. void acquireUninterruptibly()
Uninterruptibly(),和 acquire() 方法的唯一區別就是執行緒被中斷了也不會丟擲異常,其他完全一致
3. void acquireUninterruptibly(int permits)
被中斷不丟擲異常,指定每次獲取許可的數量
4. boolean tryAcquire()
只會嘗試一次獲得許可,獲得成功了就返回true,失敗了不會去排隊阻塞執行緒。
還有幾個帶引數的,意思都差不多。
5. int availablePermits()
返回可用的許可數
6. void release(int permits)
一次釋放指定的許可數
## Semaphore 總結
1. Semaphore 也是基於AQS框架來實現的
2. Semaphore 也有公平和非公平之說,公平就是在獲取許可之前會先看一下佇列是否有其他執行緒在排隊
3. Semaphore 的初始訊號量必須指定,如果是1的話,功能就相當於一個互斥鎖了
4. Semaphore 支援重入獲得許可,但是這裡要注意的是,如果一個執行緒先獲得了許可,沒釋放又來獲得許可,這時候許可數不足的情況下,當前執行緒會被阻塞,有可能會死鎖。
5. 如果這篇文章沒看懂,可以先去看看的之前關於AQS(AQS分析文章裡面有一個自己實現的共享鎖,和這裡的訊號量非常相似)、ReentrantLock和RRWLock原始碼分析的文章,所有文章看完,保證你一懂百懂,奧