JUC原始碼分析11-locks-Semaphore
阿新 • • 發佈:2019-02-03
Semaphore不明白為什麼直接放在juc包下,不是應該放locks下面嘛,這裡還是當初locks學習吧。
Semaphore英文是訊號量的意思,在這裡我喜歡叫資源或者許可,實現的功能就是獲取資源,獲取到就幹活,獲取不到就排隊,等別人釋放了資源,然後所有排隊的再去獲取。實現AQS的共享api,看個入門demo:
public class SemaphoreTest { //3個鑰匙 private static Semaphore semaphore = new Semaphore(3); public static void main(String[] args) throws InterruptedException { for(int i =0; i<10; i++){ new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread() + "得到一把鑰匙"); //模擬幹些事情,要不然控制檯看不出效果 Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }finally{ semaphore.release(); System.out.println(Thread.currentThread() + "丟掉一把鑰匙"); } } }).start(); } //不是佔用鎖的執行緒也可以release // semaphore.release(4); // Thread.sleep(4000); // System.out.println(semaphore.availablePermits()); } }
另外Semaphore也支援公平和非公平,區別跟ReetrantLock的公平非公平差不多,非公平就是獲取的時候有可用的就插隊,公平的就老老實實排隊。不過lock的release必須是持有鎖的執行緒去release,而Semaphore不是。
看下內部對AQS的實現:
看下sync的公平和非公平的子類實現:abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; //AQS的state儲存許可的數量 Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } //非公平的Acquire final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; //許可量小於0就排隊,要不然就cas設定返回可用的數量,肯定是大於0,不用排隊 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //對AQS共享模式tryReleaseShard的release protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } //這是減少許可量, //舉例說我專案要求有5個幹活小弟,但是老大說人太多,只能給你3個人,好吧,那就減少2個吧 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } //這裡是獲取所有可用的許可量 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
看下Semaphore的實現:tatic final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; //父類設定state許可量 NonfairSync(int permits) { super(permits); } //非公平的共享Acquire,呼叫分類實現 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; //父類設定state許可量 FairSync(int permits) { super(permits); } //公平的共享Acquire protected int tryAcquireShared(int acquires) { for (;;) { //這裡是區分,看下pre是否有非自己執行緒排隊的,非公平的沒有這一步 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
//傳入許可量,呼叫sync設定AQS的state的值,預設非公平
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
許可量跟是否公平標識
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
呼叫AQS響應中斷的Acquire
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
這個不響應中斷的Acquire
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
/**
直接呼叫sync的非公平Acquire,如果你構造的時候使用的是公平模式,肯定會打破公平
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
響應中斷跟超時的Acquire
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
直接release一個許可
*/
public void release() {
sync.releaseShared(1);
}
/**
響應Acquire指定數量的許可
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
/**
不響應中斷的Acquire指定數量的許可
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
/**
非公平的Acquire指定數量的許可
*/
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
/**
響應中斷和超時的Acquire指定數量的許可
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
/**
release指定數量的許可
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
/**
查詢許可量
*/
public int availablePermits() {
return sync.getPermits();
}
/**
Acquire所有可用的許可並返回許可量
*/
public int drainPermits() {
return sync.drainPermits();
}
/**
扣減指定數量的許可,會導致許可量為負數,使用的時候注意,自己可以定義個子類看看
*/
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
/**
驗證是否是公平
*/
public boolean isFair() {
return sync instanceof FairSync;
}
/**
呼叫AQS檢查佇列是否還有等待節點
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
返回AQS中節點數量
*/
public final int getQueueLength() {
return sync.getQueueLength();
}
/**
返回AQS同步等待佇列所有等待Acquire的數量
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
Semaphore還是比較簡單的,因為實現的是共享模式API,所以不管lock還是lock的執行緒都可以release,另外感覺收縮許可量的時候可能會導致許可量為負,使用的時候需要注意。