Semaphore 的使用與原始碼解析
阿新 • • 發佈:2021-12-05
使用
訊號量(Semaphore)允許多個執行緒同時訪問臨界區資源,而 ReentrantLock 這類鎖同時只允許一個執行緒訪問臨界區資源。
Semaphore 就是共享鎖,它的構造方法可以傳一個許可數 permits,表示臨界區資源數量,多少個資源同時也最多隻能由多少個執行緒來訪問。當 permits 等於 1 時,Semaphore 就等價於 ReentrantLock 這類互斥鎖。
程式碼示例:
@Test public void test() throws InterruptedException { // 定義一個執行緒池 ExecutorService executor = Executors.newFixedThreadPool(3); int userNuber = 10; CountDownLatch countDownLatch = new CountDownLatch(userNuber); // 三個許可,表示資源總數,這裡表示只有三本書籍,所以最多隻能由三個人借閱,即最多隻能有三個執行緒獲取到鎖 int permits = 3; Semaphore semaphore = new Semaphore(permits); IntStream.range(0, userNuber).forEach(i -> { executor.submit(() -> { try { System.out.println("學生" + i + "等待借閱書籍......"); // 獲取一個許可(加鎖) semaphore.acquire(1); System.out.println("學生" + i + "借閱到了一個本書籍,當前還剩餘" + semaphore.availablePermits() + "本書籍"); Thread.sleep(i * 500); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放一個許可(解鎖) semaphore.release(1); System.out.println("學生" + i + "歸還了一個本書籍,當前還剩餘" + semaphore.availablePermits() + "本書籍"); countDownLatch.countDown(); } }); }); countDownLatch.await(); executor.shutdown(); /** * 執行結果: * * 學生0等待借閱書籍...... * 學生0借閱到了一個本書籍,當前還剩餘2本書籍 * 學生1等待借閱書籍...... * 學生1借閱到了一個本書籍,當前還剩餘1本書籍 * 學生2等待借閱書籍...... * 學生2借閱到了一個本書籍,當前還剩餘0本書籍 * 學生0歸還了一個本書籍,當前還剩餘1本書籍 * 學生3等待借閱書籍...... * 學生3借閱到了一個本書籍,當前還剩餘0本書籍 * 學生1歸還了一個本書籍,當前還剩餘1本書籍 * 學生4等待借閱書籍...... * 學生4借閱到了一個本書籍,當前還剩餘0本書籍 * 學生2歸還了一個本書籍,當前還剩餘1本書籍 * 學生5等待借閱書籍...... * 學生5借閱到了一個本書籍,當前還剩餘0本書籍 * 學生3歸還了一個本書籍,當前還剩餘1本書籍 * 學生6等待借閱書籍...... * 學生6借閱到了一個本書籍,當前還剩餘0本書籍 * 學生4歸還了一個本書籍,當前還剩餘1本書籍 * 學生7等待借閱書籍...... * 學生7借閱到了一個本書籍,當前還剩餘0本書籍 * 學生5歸還了一個本書籍,當前還剩餘1本書籍 * 學生8等待借閱書籍...... * 學生8借閱到了一個本書籍,當前還剩餘0本書籍 * 學生6歸還了一個本書籍,當前還剩餘1本書籍 * 學生9等待借閱書籍...... * 學生9借閱到了一個本書籍,當前還剩餘0本書籍 * 學生7歸還了一個本書籍,當前還剩餘1本書籍 * 學生8歸還了一個本書籍,當前還剩餘2本書籍 * 學生9歸還了一個本書籍,當前還剩餘3本書籍 */ }
原始碼解析
Sync 內部類
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // Semaphore 建構函式的傳參 permits 賦值給 state Sync(int permits) { setState(permits); } // 獲取許可數(即 state 的值) final int getPermits() { return getState(); } // 非公平鎖方式獲取共享鎖 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 當前可用的許可數 int available = getState(); // 當前申請鎖後剩餘的許可數 int remaining = available - acquires; // 如果剩餘許可數小於 0 直接返回剩餘許可數 // 如果剩餘許可數大於 0,將其設定為 state,如果成功,則返回剩餘許可數 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 釋放鎖 protected final boolean tryReleaseShared(int releases) { for (;;) { // 當前可用的許可數 int current = getState(); // (釋放鎖後)增加許可數 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 將增加後的許可數賦值給 state,成功則返回,否則自旋重試 if (compareAndSetState(current, next)) return true; } } // 減少許可數 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } // 清空許可數 final int drainPermits() { for (;;) { // 當前許可數 int current = getState(); // 將 state 設定為 0 if (current == 0 || compareAndSetState(current, 0)) // 返回當前許可數 return current; } } }
NonfairSync 內部類
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
FairSync 內部類
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果當前執行緒節點還有前序節點,則獲取鎖失敗
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
建構函式
// 預設使用非公平鎖
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// permits 表示同時允許訪問臨界區資源的執行緒數
// fair 表示使用公平鎖實現還是非公平鎖實現
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire
// 可中斷地阻塞獲取鎖
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireUninterruptibly
// 不可中斷地阻塞獲取鎖
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
tryAcquire
public boolean tryAcquire() {
// nonfairTryAcquireShared 返回剩餘可用許可數
// 剩餘可用許可數大於等於 0,表示可以獲取鎖
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
release
// 釋放鎖(一個許可)
public void release() {
sync.releaseShared(1);
}
acquire
// 一次獲取 permits 個許可(可中斷,阻塞獲取)
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
acquireUninterruptibly
// 一次獲取 permits 個許可(不可中斷、阻塞獲取)
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
tryAcquire
// 一次獲取 permits 個許可(可中斷,非阻塞獲取)
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
// 一次獲取 permits 個許可(可中斷,非阻塞獲取、超時可退出)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
release
// 釋放多個許可
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}