1. 程式人生 > >JUC原始碼分析11-locks-Semaphore

JUC原始碼分析11-locks-Semaphore

Semaphore不明白為什麼直接放在juc包下,不是應該放locks下面嘛,這裡還是當初locks學習吧。

Semaphore英文是訊號量的意思,在這裡我喜歡叫資源或者許可,實現的功能就是獲取資源,獲取到就幹活,獲取不到就排隊,等別人釋放了資源,然後所有排隊的再去獲取。實現AQS的共享api,看個入門demo:

public class SemaphoreTest {

    //3個鑰匙
    private static Semaphore semaphore = new Semaphore(3);
    
    public static void main(String[] args) throws InterruptedException {

        for(int i =0; i<10; i++){
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println(Thread.currentThread() + "得到一把鑰匙");
                        //模擬幹些事情,要不然控制檯看不出效果
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally{
                        semaphore.release();
                        System.out.println(Thread.currentThread() + "丟掉一把鑰匙");
                    }
                }
            }).start();
        }
//不是佔用鎖的執行緒也可以release
//        semaphore.release(4);
//        Thread.sleep(4000);
//        System.out.println(semaphore.availablePermits());
    }
}

另外Semaphore也支援公平和非公平,區別跟ReetrantLock的公平非公平差不多,非公平就是獲取的時候有可用的就插隊,公平的就老老實實排隊。不過lock的release必須是持有鎖的執行緒去release,而Semaphore不是。

看下內部對AQS的實現:

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
	//AQS的state儲存許可的數量
    Sync(int permits) {
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }
	//非公平的Acquire
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            //許可量小於0就排隊,要不然就cas設定返回可用的數量,肯定是大於0,不用排隊
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
	//對AQS共享模式tryReleaseShard的release
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }
	//這是減少許可量,
	//舉例說我專案要求有5個幹活小弟,但是老大說人太多,只能給你3個人,好吧,那就減少2個吧
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }
	//這裡是獲取所有可用的許可量
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}
看下sync的公平和非公平的子類實現:
tatic final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

	//父類設定state許可量
    NonfairSync(int permits) {
        super(permits);
    }
	//非公平的共享Acquire,呼叫分類實現
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
	//父類設定state許可量
    FairSync(int permits) {
        super(permits);
    }
	//公平的共享Acquire
    protected int tryAcquireShared(int acquires) {
        for (;;) {
        	//這裡是區分,看下pre是否有非自己執行緒排隊的,非公平的沒有這一步
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
看下Semaphore的實現:
//傳入許可量,呼叫sync設定AQS的state的值,預設非公平
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
/**
許可量跟是否公平標識
 */
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
呼叫AQS響應中斷的Acquire
 */
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
/**
這個不響應中斷的Acquire 
 */
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
/**
直接呼叫sync的非公平Acquire,如果你構造的時候使用的是公平模式,肯定會打破公平
 */
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
響應中斷跟超時的Acquire
 */
public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
直接release一個許可
 */
public void release() {
    sync.releaseShared(1);
}
/**
響應Acquire指定數量的許可
 */
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
/**
不響應中斷的Acquire指定數量的許可
 */
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}
/**
非公平的Acquire指定數量的許可
 */
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}
/**
響應中斷和超時的Acquire指定數量的許可
 */
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
/**
release指定數量的許可
 */
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
/**
查詢許可量
 */
public int availablePermits() {
    return sync.getPermits();
}
/**
Acquire所有可用的許可並返回許可量
 */
public int drainPermits() {
    return sync.drainPermits();
}
/**
扣減指定數量的許可,會導致許可量為負數,使用的時候注意,自己可以定義個子類看看
 */
protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}
/**
驗證是否是公平
 */
public boolean isFair() {
    return sync instanceof FairSync;
}
/**
呼叫AQS檢查佇列是否還有等待節點
 */
public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}
/**
返回AQS中節點數量
 */
public final int getQueueLength() {
    return sync.getQueueLength();
}
/**
返回AQS同步等待佇列所有等待Acquire的數量
 */
protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
}

Semaphore還是比較簡單的,因為實現的是共享模式API,所以不管lock還是lock的執行緒都可以release,另外感覺收縮許可量的時候可能會導致許可量為負,使用的時候需要注意。