Semaphore源碼分析
Semaphore有兩種模式,公平模式和非公平模式。公平模式就是調用acquire的順序就是獲取許可證的順序,遵循FIFO;而非公平模式是搶占式的,也就是有可能一個新的獲取線程恰好在一個許可證釋放時得到了這個許可證,而前面還有等待的線程。
構造方法
Semaphore有兩個構造方法,如下:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
從上面可以看到兩個構造方法,都必須提供許可的數量,第二個構造方法可以指定是公平模式還是非公平模式,默認非公平模式。
Semaphore內部基於AQS的共享模式,所以實現都委托給了Sync類。
這裏就看一下NonfairSync的構造方法:
NonfairSync(int permits) { super(permits); }
可以看到直接調用了父類的構造方法,Sync的構造方法如下:
Sync(int permits) { setState(permits); }
可以看到調用了setState方法,也就是說AQS中的資源就是許可證的數量。
獲取許可
先從獲取一個許可看起,並且先看非公平模式下的實現。首先看acquire方法,acquire方法有幾個重載,但主要是下面這個方法
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
從上面可以看到,調用了Sync的acquireSharedInterruptibly方法,該方法在父類AQS中,如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //如果線程被中斷了,拋出異常 if (Thread.interrupted()) throw new InterruptedException(); //獲取許可失敗,將線程加入到等待隊列中 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
AQS子類如果要使用共享模式的話,需要實現tryAcquireShared方法,下面看NonfairSync的該方法實現:
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
該方法調用了父類中的nonfairTyAcquireShared方法,如下:
final int nonfairTryAcquireShared(int acquires) { for (;;) { //獲取剩余許可數量 int available = getState(); //計算給完這次許可數量後的個數 int remaining = available - acquires; //如果許可不夠或者可以將許可數量重置的話,返回 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
從上面可以看到,只有在許可不夠時返回值才會小於0,其余返回的都是剩余許可數量,這也就解釋了,一旦許可不夠,後面的線程將會阻塞。看完了非公平的獲取,再看下公平的獲取,代碼如下:
protected int tryAcquireShared(int acquires) { for (;;) { //如果前面有線程再等待,直接返回-1 if (hasQueuedPredecessors()) return -1; //後面與非公平一樣 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
從上面可以看到,FairSync與NonFairSync的區別就在於會首先判斷當前隊列中有沒有線程在等待,如果有,就老老實實進入到等待隊列;而不像NonfairSync一樣首先試一把,說不定就恰好獲得了一個許可,這樣就可以插隊了。
看完了獲取許可後,再看一下釋放許可。
釋放許可
釋放許可也有幾個重載方法,但都會調用下面這個帶參數的方法,
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
releaseShared方法在AQS中,如下:
public final boolean releaseShared(int arg) { //如果改變許可數量成功 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
AQS子類實現共享模式的類需要實現tryReleaseShared類來判斷是否釋放成功,實現如下:
protected final boolean tryReleaseShared(int releases) { for (;;) { //獲取當前許可數量 int current = getState(); //計算回收後的數量 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //CAS改變許可數量成功,返回true if (compareAndSetState(current, next)) return true; } }
從上面可以看到,一旦CAS改變許可數量成功,那麽就會調用doReleaseShared()方法釋放阻塞的線程。
減小許可數量
Semaphore還有減小許可數量的方法,該方法可以用於用於當資源用完不能再用時,這時就可以減小許可證。代碼如下:
protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
可以看到,委托給了Sync,Sync的reducePermits方法如下:
final void reducePermits(int reductions) { for (;;) { //得到當前剩余許可數量 int current = getState(); //得到減完之後的許可數量 int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); //如果CAS改變成功 if (compareAndSetState(current, next)) return; } }
從上面可以看到,就是CAS改變AQS中的state變量,因為該變量代表許可證的數量。
獲取剩余許可數量
Semaphore還可以一次將剩余的許可數量全部取走,該方法是drain方法,如下:
public int drainPermits() { return sync.drainPermits(); }
Sync的實現如下:
final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } }
可以看到,就是CAS將許可數量置為0。
總結
Semaphore是信號量,用於管理一組資源。其內部是基於AQS的共享模式,AQS的狀態表示許可證的數量,在許可證數量不夠時,線程將會被掛起;而一旦有一個線程釋放一個資源,那麽就有可能重新喚醒等待隊列中的線程繼續執行。
Semaphore源碼分析