二十九、併發程式設計之併發工具類Semaphore詳解
一、簡介
Semaphore是一個計數訊號量,常用於限制可以訪問某些資源(物理或邏輯的)執行緒數目。
Semaphore是計數訊號量。Semaphore管理一系列許可證。每個acquire方法阻塞,直到有一個許可證可以獲得然後拿走一個許可證;每個release方法增加一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並沒有實際的許可證這個物件,Semaphore只是維持了一個可獲得許可證的數量。
Semaphore經常用於限制獲取某種資源的執行緒數量。
二、原理
1、構造方法
Semaphore有兩個構造方法,如下:
public Semaphore (int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
從上面可以看到兩個構造方法,都必須提供許可的數量,第二個構造方法可以指定是公平模式還是非公平模式,預設非公平模式。
Semaphore內部基於AQS的共享模式,所以實現都委託給了Sync類。
這裡就看一下NonfairSync的構造方法:
Sync(int permits) {
setState(permits);
}
可以看到呼叫了setState方法,也就是說AQS中的資源就是許可證的數量。
2、獲取許可
先從獲取一個許可看起,並且先看非公平模式下的實現。首先看acquire方法,acquire方法有幾個過載,但主要是下面這個方法
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly (permits);
}
從上面可以看到,呼叫了Sync的acquireSharedInterruptibly方法,該方法在父類AQS中,如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//如果執行緒被中斷了,丟擲異常
if (Thread.interrupted())
throw new InterruptedException();
//獲取許可失敗,將執行緒加入到等待佇列中
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
AQS子類如果要使用共享模式的話,需要實現tryAcquireShared方法,下面看NonfairSync的該方法實現:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
該方法呼叫了父類中的nonfairTyAcquireShared方法,如下:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//獲取剩餘許可數量
int available = getState();
//計算給完這次許可數量後的個數
int remaining = available - acquires;
//如果許可不夠或者可以將許可數量重置的話,返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
從上面可以看到,只有在許可不夠時返回值才會小於0,其餘返回的都是剩餘許可數量,這也就解釋了,一旦許可不夠,後面的執行緒將會阻塞。看完了非公平的獲取,再看下公平的獲取,程式碼如下:
protected int tryAcquireShared(int acquires) {
for (;;) {
//如果前面有執行緒再等待,直接返回-1
if (hasQueuedPredecessors())
return -1;
//後面與非公平一樣
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
從上面可以看到,FairSync與NonFairSync的區別就在於會首先判斷當前佇列中有沒有執行緒在等待,如果有,就老老實實進入到等待佇列;而不像NonfairSync一樣首先試一把,說不定就恰好獲得了一個許可,這樣就可以插隊了。
看完了獲取許可後,再看一下釋放許可。
3、釋放許可
釋放許可也有幾個過載方法,但都會呼叫下面這個帶引數的方法,
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
releaseShared方法在AQS中,如下:
public final boolean releaseShared(int arg) {
//如果改變許可數量成功
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
AQS子類實現共享模式的類需要實現tryReleaseShared類來判斷是否釋放成功,實現如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//獲取當前許可數量
int current = getState();
//計算回收後的數量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS改變許可數量成功,返回true
if (compareAndSetState(current, next))
return true;
}
}
從上面可以看到,一旦CAS改變許可數量成功,那麼就會呼叫doReleaseShared()方法釋放阻塞的執行緒。
減小許可數量。
Semaphore還有減小許可數量的方法,該方法可以用於用於當資源用完不能再用時,這時就可以減小許可證。程式碼如下:
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
可以看到,委託給了Sync,Sync的reducePermits方法如下:
final void reducePermits(int reductions) {
for (;;) {
//得到當前剩餘許可數量
int current = getState();
//得到減完之後的許可數量
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
//如果CAS改變成功
if (compareAndSetState(current, next))
return;
}
}
從上面可以看到,就是CAS改變AQS中的state變數,因為該變數代表許可證的數量。
獲取剩餘許可數量
Semaphore還可以一次將剩餘的許可數量全部取走,該方法是drain方法,如下:
public int drainPermits() {
return sync.drainPermits();
}
Sync的實現如下:
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
可以看到,就是CAS將許可數量置為0。
三、使用
操場上有5個跑道,一個跑道一次只能有一個學生在上面跑步,一旦所有跑道在使用,那麼後面的學生就需要等待,直到有一個學生不跑了。
/**
* 操場,有5個跑道
*/
public class Playground {
/**
* 跑道類
*/
static class Track {
private int num;
public Track(int num) {
this.num = num;
}
@Override
public String toString() {
return "Track{" +
"num=" + num +
'}';
}
}
private Track[] tracks = {new Track(1), new Track(2), new Track(3), new Track(4), new Track(5)};
private volatile boolean[] used = new boolean[5];
private Semaphore semaphore = new Semaphore(5, true);
/**
* 獲取一個跑道
*/
public Track getTrack() throws InterruptedException {
semaphore.acquire(1);
return getNextAvailableTrack();
}
/**
* 返回一個跑道
*/
public void releaseTrack(Track track) {
if (makeAsUsed(track))
semaphore.release(1);
}
/**
* 遍歷,找到一個沒人用的跑道
*/
private Track getNextAvailableTrack() {
for (int i = 0; i < used.length; i++) {
if (!used[i]) {
used[i] = true;
return tracks[i];
}
}
return null;
}
/**
* 返回一個跑道
*/
private boolean makeAsUsed(Track track) {
for (int i = 0; i < used.length; i++) {
if (tracks[i] == track) {
if (used[i]) {
used[i] = false;
return true;
} else {
return false;
}
}
}
return false;
}
}
建立了5個跑道物件,並使用一個boolean型別的陣列記錄每個跑道是否被使用了,初始化了5個許可證的Semaphore,在獲取跑道時首先呼叫acquire(1)獲取一個許可證,在歸還一個跑道是呼叫release(1)釋放一個許可證。接下來再看啟動程式:
public class SemaphoreDemo {
static class Student implements Runnable {
private int num;
private Playground playground;
public Student(int num, Playground playground) {
this.num = num;
this.playground = playground;
}
@Override
public void run() {
try {
//獲取跑道
Playground.Track track = playground.getTrack();
if (track != null) {
System.out.println("學生" + num + "在" + track.toString() + "上跑步");
TimeUnit.SECONDS.sleep(2);
System.out.println("學生" + num + "釋放" + track.toString());
//釋放跑道
playground.releaseTrack(track);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Executor executor = Executors.newCachedThreadPool();
Playground playground = new Playground();
for (int i = 0; i < 100; i++) {
executor.execute(new Student(i+1,playground));
}
}
}
四、總結
Semaphore是訊號量,用於管理一組資源。其內部是基於AQS的共享模式,AQS的狀態表示許可證的數量,在許可證數量不夠時,執行緒將會被掛起;而一旦有一個執行緒釋放一個資源,那麼就有可能重新喚醒等待佇列中的執行緒繼續執行。