AQS系列(六)- Semaphore的使用及原理
前言
Semaphore也是JUC包中一個用於併發控制的工具類,舉個常用場景的例子:有三臺電腦五個人,每個人都要用電腦註冊一個自己的賬戶,這時最開始只能同時有三個人操作電腦註冊賬戶,這三個人中有人操作完了剩下的兩個人才能佔用電腦註冊自己的賬戶。這就是Semaphore的經典使用場景,跟併發加鎖有點像,只是我們的併發加鎖同一時間只讓有一個執行緒執行,而Semaphore的加鎖控制是允許同一時間有指定數量的執行緒同時執行,超過這個數量就加鎖控制。
一、使用樣例
1 public static void main(String[] args) { 2 Semaphore semaphore = new Semaphore(3); // 對比上面例子中的3臺電腦 3 for (int i = 0; i < 5; i++) { // 對比上面例子中的5個人 4 new Thread(() -> { 5 try { 6 semaphore.acquire(1); // 注意acquire中的值可以傳任意值>=0的整數 7 } catch (InterruptedException e) { 8 e.printStackTrace(); 9 } 10 System.out.println(Thread.currentThread().getName() + " acquire 1"); 11 try { 12 Thread.sleep(2000); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 System.out.println(Thread.currentThread().getName() + "release 1"); 17 semaphore.release(1); 18 }).start(); 19 } 20 }
執行結果為:
1 Thread-0 acquire 1 2 Thread-2 acquire 1 3 Thread-1 acquire 1 4 Thread-1release 1 5 Thread-2release 1 6 Thread-0release 1 7 Thread-4 acquire 1 8 Thread-3 acquire 1 9 Thread-4release 1 10 Thread-3release 1
可以看到同一時間只有三個執行緒獲取到了鎖,這三個執行完釋放了之後,剩下兩個菜獲取鎖執行。下面看看原始碼是如何實現的。
二、原始碼實現
1、Semaphore構造器
1 public Semaphore(int permits) { 2 sync = new NonfairSync(permits); 3 } 4 5 public Semaphore(int permits, boolean fair) { 6 sync = fair ? new FairSync(permits) : new NonfairSync(permits); 7 }
可以看到,Semaphore有兩個構造器,一個是隻傳數值預設非公平鎖,另一個可指定用公平鎖還是非公平鎖。permits最終還是賦值給了AQS中的state變數。
2、acquire(1)方法
1 public void acquire(int permits) throws InterruptedException { 2 if (permits < 0) throw new IllegalArgumentException(); 3 sync.acquireSharedInterruptibly(permits); 4 }
此方法同樣呼叫了AQS中的模板方法:
1 public final void acquireSharedInterruptibly(int arg) 2 throws InterruptedException { 3 if (Thread.interrupted()) 4 throw new InterruptedException(); 5 if (tryAcquireShared(arg) < 0) 6 doAcquireSharedInterruptibly(arg); 7 }
1)、檢視tryAcquireShared的實現方法
先看非公平鎖的獲取:
1 final int nonfairTryAcquireShared(int acquires) { 2 for (;;) { 3 int available = getState(); 4 int remaining = available - acquires; // 如果remaining是負的,說明當前剩餘的訊號量不夠了,需要阻塞 5 if (remaining < 0 || 6 compareAndSetState(available, remaining)) // 如果remaining<0則直接return,不會走CAS;如果大於0,說明訊號量還夠,可走CAS將訊號量減掉,成功則返回大於0的remaining 7 return remaining; 8 } 9 }
再看公平鎖的獲取:
1 protected int tryAcquireShared(int acquires) { 2 for (;;) { 3 if (hasQueuedPredecessors()) // 判斷是不是在隊首,不是的話直接返回-1 4 return -1; 5 int available = getState(); // 後面邏輯通非公平鎖的獲取邏輯 6 int remaining = available - acquires; 7 if (remaining < 0 || 8 compareAndSetState(available, remaining)) 9 return remaining; 10 } 11 }
可以看到,不管非公平鎖和公平鎖,加鎖時都是先判斷當前state夠不夠減的,如果減出負數返回獲取鎖失敗,是正數才走CAS將原訊號量扣掉,返回獲取鎖成功。加鎖時一個減state的過程。
2)、doAcquireSharedInterruptibly
此方法還是AQS中的實現,邏輯重複,就不再說明了。
3、release(1)方法
1 public void release(int permits) { 2 if (permits < 0) throw new IllegalArgumentException(); 3 sync.releaseShared(permits); 4 }
同樣呼叫了AQS中的模板方法releaseShared:
1 public final boolean releaseShared(int arg) { 2 if (tryReleaseShared(arg)) { 3 doReleaseShared(); 4 return true; 5 } 6 return false; 7 }
其中tryReleaseShared的實現在Semaphore類的Sync中,如下所示:
1 protected final boolean tryReleaseShared(int releases) { 2 for (;;) { 3 int current = getState(); 4 int next = current + releases; // 用當前state加上要釋放的releases 5 if (next < current) // overflow 6 throw new Error("Maximum permit count exceeded"); 7 if (compareAndSetState(current, next)) // 用CAS將state加上 8 return true; 9 } 10 }
另一個方法doReleaseShared之前看過,此處就不贅述了。
三、小結
Semaphore訊號量類基於AQS的共享鎖實現,有公平鎖和非公平鎖兩個版本。它的加鎖與釋放鎖的不同之處在於和普通的加鎖釋放鎖反著,ReentrantLock和ReentrantReadWriteLock中都是加鎖時state+1,釋放鎖時state-1,而Semaphore中是加鎖時state減,釋放鎖時state加。
另外,如果它還可以acquire(2) 、release(1),即獲取的和釋放的訊號量可以不一致,只是需要注意別釋放的訊號量太少導致後續任務獲取不到足夠的量而永久阻塞