1. 程式人生 > 其它 >Semaphore 的使用與原始碼解析

Semaphore 的使用與原始碼解析

使用

訊號量(Semaphore)允許多個執行緒同時訪問臨界區資源,而 ReentrantLock 這類鎖同時只允許一個執行緒訪問臨界區資源。

Semaphore 就是共享鎖,它的構造方法可以傳一個許可數 permits,表示臨界區資源數量,多少個資源同時也最多隻能由多少個執行緒來訪問。當 permits 等於 1 時,Semaphore 就等價於 ReentrantLock 這類互斥鎖。

程式碼示例:

@Test
public void test() throws InterruptedException {
    // 定義一個執行緒池
    ExecutorService executor = Executors.newFixedThreadPool(3);
    int userNuber = 10;
    CountDownLatch countDownLatch = new CountDownLatch(userNuber);
    // 三個許可,表示資源總數,這裡表示只有三本書籍,所以最多隻能由三個人借閱,即最多隻能有三個執行緒獲取到鎖
    int permits = 3;
    Semaphore semaphore = new Semaphore(permits);

    IntStream.range(0, userNuber).forEach(i -> {
        executor.submit(() -> {
            try {
                System.out.println("學生" + i + "等待借閱書籍......");

                // 獲取一個許可(加鎖)
                semaphore.acquire(1);

                System.out.println("學生" + i + "借閱到了一個本書籍,當前還剩餘" + semaphore.availablePermits() + "本書籍");
                Thread.sleep(i * 500);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 釋放一個許可(解鎖)
                semaphore.release(1);
                System.out.println("學生" + i + "歸還了一個本書籍,當前還剩餘" + semaphore.availablePermits() + "本書籍");
                countDownLatch.countDown();
            }
        });
    });

    countDownLatch.await();
    executor.shutdown();

    /**
         * 執行結果:
         *
         * 學生0等待借閱書籍......
         * 學生0借閱到了一個本書籍,當前還剩餘2本書籍
         * 學生1等待借閱書籍......
         * 學生1借閱到了一個本書籍,當前還剩餘1本書籍
         * 學生2等待借閱書籍......
         * 學生2借閱到了一個本書籍,當前還剩餘0本書籍
         * 學生0歸還了一個本書籍,當前還剩餘1本書籍
         * 學生3等待借閱書籍......
         * 學生3借閱到了一個本書籍,當前還剩餘0本書籍
         * 學生1歸還了一個本書籍,當前還剩餘1本書籍
         * 學生4等待借閱書籍......
         * 學生4借閱到了一個本書籍,當前還剩餘0本書籍
         * 學生2歸還了一個本書籍,當前還剩餘1本書籍
         * 學生5等待借閱書籍......
         * 學生5借閱到了一個本書籍,當前還剩餘0本書籍
         * 學生3歸還了一個本書籍,當前還剩餘1本書籍
         * 學生6等待借閱書籍......
         * 學生6借閱到了一個本書籍,當前還剩餘0本書籍
         * 學生4歸還了一個本書籍,當前還剩餘1本書籍
         * 學生7等待借閱書籍......
         * 學生7借閱到了一個本書籍,當前還剩餘0本書籍
         * 學生5歸還了一個本書籍,當前還剩餘1本書籍
         * 學生8等待借閱書籍......
         * 學生8借閱到了一個本書籍,當前還剩餘0本書籍
         * 學生6歸還了一個本書籍,當前還剩餘1本書籍
         * 學生9等待借閱書籍......
         * 學生9借閱到了一個本書籍,當前還剩餘0本書籍
         * 學生7歸還了一個本書籍,當前還剩餘1本書籍
         * 學生8歸還了一個本書籍,當前還剩餘2本書籍
         * 學生9歸還了一個本書籍,當前還剩餘3本書籍
         */
}

原始碼解析

Sync 內部類

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    // Semaphore 建構函式的傳參 permits 賦值給 state
    Sync(int permits) {
        setState(permits);
    }

    // 獲取許可數(即 state 的值)
    final int getPermits() {
        return getState();
    }

    // 非公平鎖方式獲取共享鎖
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // 當前可用的許可數
            int available = getState();
            // 當前申請鎖後剩餘的許可數
            int remaining = available - acquires;
            // 如果剩餘許可數小於 0 直接返回剩餘許可數
            // 如果剩餘許可數大於 0,將其設定為 state,如果成功,則返回剩餘許可數
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    // 釋放鎖
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 當前可用的許可數
            int current = getState();
            // (釋放鎖後)增加許可數
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 將增加後的許可數賦值給 state,成功則返回,否則自旋重試
            if (compareAndSetState(current, next))
                return true;
        }
    }

    // 減少許可數
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    // 清空許可數
    final int drainPermits() {
        for (;;) {
            // 當前許可數
            int current = getState();
            // 將 state 設定為 0
            if (current == 0 || compareAndSetState(current, 0))
                // 返回當前許可數
                return current;
        }
    }
}

NonfairSync 內部類

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

FairSync 內部類

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 如果當前執行緒節點還有前序節點,則獲取鎖失敗
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

建構函式

// 預設使用非公平鎖
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

// permits 表示同時允許訪問臨界區資源的執行緒數
// fair 表示使用公平鎖實現還是非公平鎖實現
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire

// 可中斷地阻塞獲取鎖
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireUninterruptibly

// 不可中斷地阻塞獲取鎖
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

tryAcquire

public boolean tryAcquire() {
    // nonfairTryAcquireShared 返回剩餘可用許可數
    // 剩餘可用許可數大於等於 0,表示可以獲取鎖
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

release

// 釋放鎖(一個許可)
public void release() {
    sync.releaseShared(1);
}

acquire

// 一次獲取 permits 個許可(可中斷,阻塞獲取)
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

acquireUninterruptibly

// 一次獲取 permits 個許可(不可中斷、阻塞獲取)
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

tryAcquire

// 一次獲取 permits 個許可(可中斷,非阻塞獲取)
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

// 一次獲取 permits 個許可(可中斷,非阻塞獲取、超時可退出)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

release

// 釋放多個許可
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}