高併發學習筆記(八)
阿新 • • 發佈:2019-04-10
四、同步器的實現(四)
4.Semaphore
Semaphore是一個訊號計數量,用於維護一個資源的使用許可量,如果維護的資源使用許可超出會阻塞每一個請求該資源的許可(acquire),直到又有可用的。Semaphore通常用於限制訪問資源的執行緒數目,下面是一個使用示例:
/** * Semaphore使用示例,Semaphore代表廁所坑位 * Created by bzhang on 2019/3/21. */ public class TestSemaphore { private Semaphore lock = new Semaphore(3); //廁所坑位 public void goToToilet(){ try { lock.acquire(); //嘗試獲取一個位置 System.out.println(Thread.currentThread().getName()+"正在使用廁所"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"上完廁所,很開心"); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.release(); //釋放一個位置 } } public static void main(String[] args) { TestSemaphore test = new TestSemaphore(); ExecutorService pool = Executors.newCachedThreadPool(); //快取執行緒池 for (int i = 0;i < 11;i++){ pool.submit(new Runnable() { @Override public void run() { test.goToToilet(); } }); } pool.shutdown(); //關閉執行緒池 } }
Semaphore的使用與ReentrantLock和synchronized的使用很相似,只是鎖的使用一般都是獨佔的,即一次只允許一個執行緒執行。而Semaphore則是許可多個執行緒進行訪問,當Semaphore只許可一個執行緒訪問時,也就退化成鎖。
Semaphore的底層也是由AQS同步器實現的:
public class Semaphore implements java.io.Serializable { //AQS同步的佇列引用 private final Sync sync; //由構造方法可以看出Semaphore也分為公平模式和非公平模式 public Semaphore(int permits) { sync = new NonfairSync(permits); } //根據fair建立公平同步佇列或是非公平同步佇列 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } }
Semaphore的繼承關係如下圖,是由三個內部類Sync,NonfairSync與FairSync共同來實現Semaphore的訊號計數功能。
下面先來看公平與非公平模式的實現原始碼:
//非公平模式的實現 static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } //重寫的實AQS中的tryAcquireShared方法,說明使用的是共享模式 //嘗試獲取鎖 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); //呼叫父類的非公平模式下嘗試獲取鎖方法 } } //公平模式的實現 static final class FairSync extends Sync { FairSync(int permits) { super(permits); } //嘗試獲取鎖 protected int tryAcquireShared(int acquires) { for (;;) { //自旋 //同步佇列中是否存在排在當前執行緒前面的執行緒結點 //也就是說當前執行緒是不是等待最久的執行緒,不是的話,直接不讓獲取鎖 if (hasQueuedPredecessors()) return -1; int available = getState(); //獲取同步狀態 int remaining = available - acquires; //將要更新的同步狀態值 //remaining小於0,說明沒有可用的資源了,返回獲取失敗 //remaining大於等於0且CAS方式更新狀態失敗,則表示有可用資源,繼續迴圈嘗試更新直到成功 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } //公平鎖與非公平鎖父類 abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); } //獲取剩餘許可資源數 final int getPermits() { return getState(); } //非公平嘗試獲取鎖,與公平模式基本相似,只是不需要判斷當前執行緒是不是等待最久的執行緒 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //釋放鎖資源 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) //超出最大許可值,拋異常 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } //減少許可數,即假設當前許可數是5,呼叫該方法會時許可數變為5-reductions final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } //獲取可立即使用的許可數 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
瞭解了底層實現,再看看Semaphore獲取許可和釋放許可的過程:
//獲取一個許可,在提供一個許可前一直將執行緒阻塞,或執行緒被中斷
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//AQS中的方法,可被中斷的獲取鎖的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//tryAcquireShared呼叫的是公平鎖FairSync或NonfairSync中的重寫方法
//tryAcquireShared可能失敗,進入doAcquireSharedInterruptibly方法進行自旋或加入同步佇列
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//嘗試獲取一個許可,在有可用的許可前將其阻塞,且不可被中斷
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
//AQS中的方法
public final void acquireShared(int arg) {
//嘗試獲取許可失敗的話,會進行自旋或加入到同步佇列中等待獲取鎖
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg); //AQS中已分析過,不再深入
}
//嘗試獲取一個許可
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0; //只進行一次方式,失敗後直接返回+
}
//在一定時間內嘗試獲取許可
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//AQS中的在一定時間內嘗試獲取鎖的方法
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//本質上還是先呼叫tryAcquireShared方法,不成功再在一定時間去自旋或加入同步佇列等待獲取許可
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
//獲取多個許可,在提供permits許可前一直將執行緒阻塞,或執行緒被中斷
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
//獲取給定數目的許可,在提供這些許可前一直將執行緒阻塞,不可被中斷
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
//嘗試獲取給定數目的許可,值嘗試一次,失敗直接返回
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
//在給定的時間內嘗試獲取給定數目的許可數,超時返回失敗
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
//釋放許可的過程
//釋放一個許可
public void release() {
sync.releaseShared(1);
}
//AQS中的方法
public final boolean releaseShared(int arg) {
//嘗試釋放一個許可,不成功則進入doReleaseShared繼續嘗試釋放許可
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//釋放給定數目的許可
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}