【搞定Java併發程式設計】第26篇:Java中的併發工具類之控制併發執行緒數的 Semaphore
上一篇:Java中的併發工具類之同步屏障 CyclicBarrier
本文目錄:
本文轉載自:https://mp.weixin.qq.com/s/LS8YBKpiJnHEY1kMWmwoxg
推薦閱讀:剖析基於併發AQS的共享鎖的實現(基於訊號量Semaphore)【寫的非常好】
Semphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。
Semaphore是JUC包中比較常用到的一個類,它是AQS共享模式的一個應用,可以允許多個執行緒同時對共享資源進行操作,並且可以有效的控制併發數,利用它可以很好的實現流量控制。
Semaphore提供了一個許可證的概念,可以把這個許可證看作公共汽車車票,只有成功獲取車票的人才能夠上車,並且車票是有一定數量的,不可能毫無限制的發下去,這樣就會導致公交車超載。所以當車票發完的時候(公交車以滿載),其他人就只能等下一趟車了。如果中途有人下車,那麼他的位置將會空閒出來,因此如果這時其他人想要上車的話就又可以獲得車票了。
利用Semaphore可以實現各種池,我們在本篇末尾將會動手寫一個簡易的資料庫連線池。
我們先看下Semaphore這個類的整體內部結構:
public class Semaphore implements java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {...} static final class NonfairSync extends Sync {...} static final class FairSync extends Sync {...} public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } // 獲取許可證 public void acquire() throws InterruptedException {...} // 嘗試獲取許可證 public boolean tryAcquire() {...} // 歸還許可證 public void release() {...} // 獲取permits個許可證 public void acquire(int permits) throws InterruptedException {...} // 嘗試獲取permits個許可證 public boolean tryAcquire(int permits) {...} // 歸還permits個許可證 public void release(int permits) {...} // 返回此訊號量中當前可用的許可證數 public int availablePermits() {...} public int drainPermits() {...} // 減少reduction個許可證 protected void reducePermits(int reduction) {...} // 是否有執行緒正在等待許可證 public final boolean hasQueuedThreads() {...} // 返回正在等待許可證的執行緒數 public final int getQueueLength() {...} // 返回所有等待獲取許可證的執行緒集合 protected Collection<Thread> getQueuedThreads() {...} }
可以得出下面關於 Semaphore 的類關係圖:
從上面的Semaphore類結構中可用發現它其實也是 AQS 中共享鎖的使用,因為每個執行緒共享一個池嘛。
套路解讀:建立 Semaphore 例項的時候,需要一個引數 permits,這個基本上可以確定是設定給 AQS 的 state 的,然後每個執行緒呼叫 acquire 的時候,執行 state = state - 1,release 的時候執行 state = state + 1,當然,acquire 的時候,如果 state = 0,說明沒有資源了,需要等待其他執行緒 release。
首先我們來看一下Semaphore的構造器:
// 構造器1
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 構造器2
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore提供了兩個帶參構造器,沒有提供無參構造器。這兩個構造器都必須傳入一個初始的許可證數量,使用構造器1構造出來的訊號量在獲取許可證時會採用非公平方式獲取,使用構造器2可以通過引數指定獲取許可證的方式(公平or非公平)。
Semaphore主要對外提供了兩類API,獲取許可證和釋放許可證,預設的是獲取和釋放一個許可證,也可以傳入引數來同時獲取和釋放多個許可證。在本篇中我們只講每次獲取和釋放一個許可證的情況。
1、獲取許可證
// 獲取一個許可證(響應中斷)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 獲取一個許可證(不響應中斷)
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// 嘗試獲取許可證(非公平獲取)
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 嘗試獲取許可證(定時獲取)
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
上面的API是Semaphore提供的預設獲取許可證操作。每次只獲取一個許可證,這也是現實生活中較常遇到的情況。除了直接獲取還提供了嘗試獲取,直接獲取操作在失敗之後可能會阻塞執行緒,而嘗試獲取則不會。另外還需注意的是tryAcquire方法是使用非公平方式嘗試獲取的。在平時我們比較常用到的是acquire方法去獲取許可證。下面我們就來看看它是怎樣獲取的。可以看到acquire方法裡面直接就是呼叫sync.acquireSharedInterruptibly,這個方法是AQS裡面的方法,我們簡單講一下。
// 以可中斷模式獲取鎖(共享模式)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 首先判斷執行緒是否中斷, 如果是則丟擲異常
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 1.嘗試去獲取鎖
if (tryAcquireShared(arg) < 0) {
// 2. 如果獲取失敗則進人該方法
doAcquireSharedInterruptibly(arg);
}
}
acquireSharedInterruptibly方法首先就是去呼叫tryAcquireShared方法去嘗試獲取,tryAcquireShared在AQS裡面是抽象方法,FairSync和NonfairSync這兩個派生類實現了該方法的邏輯。FairSync實現的是公平獲取的邏輯,而NonfairSync實現的非公平獲取的邏輯。
abstract static class Sync extends AbstractQueuedSynchronizer {
// 非公平方式嘗試獲取
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 獲取可用許可證
int available = getState();
// 獲取剩餘許可證
int remaining = available - acquires;
// 1.如果remaining小於0則直接返回remaining
// 2.如果remaining大於0則先更新同步狀態再返回remaining
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
// 非公平同步器
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 嘗試獲取許可證
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 公平同步器
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// 嘗試獲取許可證
protected int tryAcquireShared(int acquires) {
for (;;) {
// 判斷同步佇列前面有沒有人排隊
if (hasQueuedPredecessors()) {
// 如果有的話就直接返回-1,表示嘗試獲取失敗
return -1;
}
// 獲取可用許可證
int available = getState();
// 獲取剩餘許可證
int remaining = available - acquires;
// 1.如果remaining小於0則直接返回remaining
// 2.如果remaining大於0則先更新同步狀態再返回remaining
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
裡需要注意的是NonfairSync的tryAcquireShared方法直接呼叫的是nonfairTryAcquireShared方法,這個方法是在父類Sync裡面的。非公平獲取鎖的邏輯是先取出當前同步狀態(同步狀態表示許可證個數),將當前同步狀態減去傳入的引數,如果結果不小於0的話證明還有可用的許可證,那麼就直接使用CAS操作更新同步狀態的值,最後不管結果是否小於0都會返回該結果值。
這裡我們要了解tryAcquireShared方法返回值的含義,返回負數表示獲取失敗,零表示當前執行緒獲取成功但後續執行緒不能再獲取,正數表示當前執行緒獲取成功並且後續執行緒也能夠獲取。我們再來看acquireSharedInterruptibly方法的程式碼。
// 以可中斷模式獲取鎖(共享模式)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 首先判斷執行緒是否中斷, 如果是則丟擲異常
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 1.嘗試去獲取鎖
// 負數:表示獲取失敗
// 零值:表示當前執行緒獲取成功, 但是後繼執行緒不能再獲取了
// 正數:表示當前執行緒獲取成功, 並且後繼執行緒同樣可以獲取成功
if (tryAcquireShared(arg) < 0) {
// 2. 如果獲取失敗則進人該方法
doAcquireSharedInterruptibly(arg);
}
}
如果返回的remaining小於0的話就代表獲取失敗,因此tryAcquireShared(arg) < 0就為true,所以接下來就會呼叫doAcquireSharedInterruptibly方法,這個方法我們在講AQS的時候講過,它會將當前執行緒包裝成結點放入同步佇列尾部,並且有可能掛起執行緒。這也是當remaining小於0時執行緒會排隊阻塞的原因。
而如果返回的remaining>=0的話就代表當前執行緒獲取成功,因此tryAcquireShared(arg) < 0就為flase,所以就不會再去呼叫doAcquireSharedInterruptibly方法阻塞當前執行緒了。
以上是非公平獲取的整個邏輯,而公平獲取時僅僅是在此之前先去呼叫hasQueuedPredecessors方法判斷同步佇列是否有人在排隊,如果有的話就直接return -1表示獲取失敗,否則才繼續執行下面和非公平獲取一樣的步驟。
2、釋放許可證
// 釋放一個許可證
public void release() {
sync.releaseShared(1);
}
呼叫release方法是釋放一個許可證,它的操作很簡單,就呼叫了AQS的releaseShared方法,我們來看看這個方法。
// 釋放鎖的操作(共享模式)
public final boolean releaseShared(int arg) {
// 1.嘗試去釋放鎖
if (tryReleaseShared(arg)) {
// 2.如果釋放成功就喚醒其他執行緒
doReleaseShared();
return true;
}
return false;
}
AQS的releaseShared方法首先呼叫tryReleaseShared方法嘗試釋放鎖,這個方法的實現邏輯在子類Sync裡面。
abstract static class Sync extends AbstractQueuedSynchronizer {
...
// 嘗試釋放操作
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 獲取當前同步狀態
int current = getState();
// 將當前同步狀態加上傳入的引數
int next = current + releases;
// 如果相加結果小於當前同步狀態的話就報錯
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
// 以CAS方式更新同步狀態的值, 更新成功則返回true, 否則繼續迴圈
if (compareAndSetState(current, next)) {
return true;
}
}
}
...
}
可以看到tryReleaseShared方法裡面採用for迴圈進行自旋,首先獲取同步狀態,將同步狀態加上傳入的引數,然後以CAS方式更新同步狀態,更新成功就返回true並跳出方法,否則就繼續迴圈直到成功為止,這就是Semaphore釋放許可證的流程。