剖析基於併發AQS的共享鎖的實現(基於訊號量Semaphore)
關聯文章:
上篇文章通過ReetrantLock分析了獨佔鎖模式的實現原理,即基於AQS同步框架,本篇打算從Semaphore入手分析共享鎖模式的實現原理,與獨佔鎖模式不同的是,共享鎖模式允許同一個時刻多個執行緒可獲取同步狀態。本篇的思路是先說明Semaphore的基本用法,再通過Semaphore的內部實現原理分析共享鎖的實現,實際上其內部也是基於AQS同步器實現的,在稍後我們將會看到這事實。如果想了解獨佔鎖模式在AQS內部的實現原理,可瀏覽博主的上一篇博文:深入剖析基於併發AQS的重入鎖(ReetrantLock)及其Condition實現原理,而以下是本篇的主要內容
訊號量-Semaphore
Semaphore共享鎖的使用
訊號量(Semaphore),又被稱為訊號燈,在多執行緒環境下用於協調各個執行緒, 以保證它們能夠正確、合理的使用公共資源。訊號量維護了一個許可集,我們在初始化Semaphore時需要為這個許可集傳入一個數量值,該數量值代表同一時間能訪問共享資源的執行緒數量。執行緒可以通過acquire()
方法獲取到一個許可,然後對共享資源進行操作,注意如果許可集已分配完了,那麼執行緒將進入等待狀態,直到其他執行緒釋放許可才有機會再獲取許可,執行緒釋放一個許可通過release()
方法完成。下面通過一個簡單案例來演示
public class SemaphoreTest {
public static void main(String[] args) {
// 執行緒池
ExecutorService exec = Executors.newCachedThreadPool();
//設定訊號量同時執行的執行緒數是5
final Semaphore semp = new Semaphore(5);
// 模擬20個客戶端訪問
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
//使用acquire()獲取鎖
semp.acquire();
System.out.println("Accessing: " + NO);
//睡眠1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
} finally {
//使用完成釋放鎖
semp.release();
}
}
};
exec.execute(run);
}
// 退出執行緒池
exec.shutdown();
}
}
上述程式碼中,在建立Semaphore時初始化5個許可,這也就意味著同一個時間點允許5個執行緒進行共享資源訪問,使用acquire()
方法為每個執行緒獲取許可,並進行休眠1秒,如果5個許可已被分配完,新到來的執行緒將進入等待狀態。如果執行緒順利完成操作將通過release()
方法釋放許可,我們執行程式碼,可以發現每隔1秒幾乎同一時間出現5條執行緒訪,如下圖
Semaphore實現互斥鎖
在初始化訊號量時傳入1,使得它在使用時最多隻有一個可用的許可,從而可用作一個相互排斥的鎖。這通常也稱為二進位制訊號量,因為它只能有兩種狀態:一個可用的許可或零個可用的許可。按此方式使用時,二進位制訊號量具有某種屬性(與很多 Lock 實現不同),即可以由執行緒釋放“鎖”,而不是由所有者(因為訊號量沒有所有權的概念)。下面簡單看一個Semaphore實現互斥功能的例子
/**
* Created by zejian on 2017/7/30.
* Blog : http://blog.csdn.net/javazejian [原文地址,請尊重原創]
*/
public class SemaphoreMutex {
//初始化為1,互斥訊號量
private final static Semaphore mutex = new Semaphore(1);
public static void main(String[] args){
ExecutorService pools = Executors.newCachedThreadPool();
for (int i=0 ; i < 10;i++){
final int index = i;
Runnable run = new Runnable() {
@Override
public void run() {
try {
mutex.acquire();
System.out.println(String.format("[Thread-%s]任務id --- %s",Thread.currentThread().getId(),index));
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//使用完成釋放鎖
mutex.release();
System.out.println("-----------release");
}
}
};
pools.execute(run);
}
pools.shutdown();
}
}
建立一個數量為1的互斥訊號量Semaphore,然後併發執行10個執行緒,線上程中利用Semaphore控制執行緒的併發執行,因為訊號量數值只有1,因此每次只能一條執行緒執行,其他執行緒進入等待狀態,如下
除了獲取許可的aquire()方法和release()方法外,還提供了其他方法如下
//構造方法摘要
//建立具有給定的許可數和非公平的公平設定的Semaphore。
Semaphore(int permits)
//建立具有給定的許可數和給定的公平設定的Semaphore,true即為公平鎖
Semaphore(int permits, boolean fair)
//從此訊號量中獲取許可,不可中斷
void acquireUninterruptibly()
//返回此訊號量中當前可用的許可數。
int availablePermits()
//獲取並返回立即可用的所有許可。
int drainPermits()
//返回一個 collection,包含可能等待獲取的執行緒。
protected Collection<Thread> getQueuedThreads();
//返回正在等待獲取的執行緒的估計數目。
int getQueueLength()
//查詢是否有執行緒正在等待獲取。
boolean hasQueuedThreads()
//如果此訊號量的公平設定為 true,則返回 true。
boolean isFair()
//僅在呼叫時此訊號量存在一個可用許可,才從訊號量獲取許可。
boolean tryAcquire()
//如果在給定的等待時間內,此訊號量有可用的許可並且當前執行緒未被中斷,則從此訊號量獲取一個許可。
boolean tryAcquire(long timeout, TimeUnit unit)
Semaphore中共享鎖的實現
Semaphore的實現內部原理概要
在深入分析Semaphore的內部原理前先看看一張類圖結構
根據類圖可知,訊號量Semaphore的類結構與上一篇中分析的ReetrantLock的類結構幾乎如出一轍。Semaphore內部同樣存在繼承自AQS的內部類Sync以及繼承自Sync的公平鎖(FairSync)和非公平鎖(NofairSync),從這點也足以說明Semaphore的內部實現原理也是基於AQS併發元件的,在上一篇文章中,我們提到過,AQS是基礎元件,只負責核心併發操作,如加入或維護同步佇列,控制同步狀態,等,而具體的加鎖和解鎖操作交由子類完成,因此子類Semaphore共享鎖的獲取與釋放需要自己實現,這兩個方法分別是獲取鎖的tryAcquireShared(int arg)
方法和釋放鎖的tryReleaseShared(int arg)
方法,這點從Semaphore的內部結構完全可以看出來
從圖可知,Semaphore的內部類公平鎖(FairSync)和非公平鎖(NoFairSync)各自實現不同的獲取鎖方法即tryAcquireShared(int arg)
,畢竟公平鎖和非公平鎖的獲取稍後不同,而釋放鎖tryReleaseShared(int arg)
的操作交由Sync實現,因為釋放操作都是相同的,因此放在父類Sync中實現當然是最好的。需要明白的是,我們在呼叫Semaphore的方法時,其內部則是通過間接呼叫其內部類或AQS執行的。下面我們就從Semaphore的原始碼入手分析共享鎖實現原理,這裡先從非公平鎖入手。
非公平鎖中的共享鎖
Semaphore的建構函式如下
//預設建立公平鎖,permits指定同一時間訪問共享資源的執行緒數
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
顯然我們通過預設建構函式建立時,誕生的就是非公平鎖,
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
//呼叫父類Sync的nonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
顯然傳入的許可數permits傳遞給了父類,最終會傳給AQS中的state變數,也就是同步狀態的變數,如下
//AQS中控制同步狀態的state變數
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
//對state變數進行CAS 操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
從這點可知,Semaphore的初始化值也就是state的初始化值。當我們呼叫Semaphore的acquire()方法後,執行過程是這樣的,當一個執行緒請求到來時,如果state值代表的許可數足夠使用,那麼請求執行緒將會獲得同步狀態即對共享資源的訪問權,並更新state的值(一般是對state值減1),但如果state值代表的許可數已為0,則請求執行緒將無法獲取同步狀態,執行緒將被加入到同步佇列並阻塞,直到其他執行緒釋放同步狀態(一般是對state值加1)才可能獲取對共享資源的訪問權。呼叫Semaphore的acquire()
方法後將會呼叫到AQS的acquireSharedInterruptibly()
如下
//Semaphore的acquire()
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 注意Sync類繼承自AQS
* AQS的acquireSharedInterruptibly()方法
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//判斷是否中斷請求
if (Thread.interrupted())
throw new InterruptedException();
//如果tryAcquireShared(arg)不小於0,則執行緒獲取同步狀態成功
if (tryAcquireShared(arg) < 0)
//未獲取成功加入同步佇列等待
doAcquireSharedInterruptibly(arg);
}
從方法名就可以看出該方法是可以中斷的,也就是說Semaphore的acquire()
方法也是可中斷的。在acquireSharedInterruptibly()
方法內部先進行了執行緒中斷的判斷,如果沒有中斷,那麼先嚐試呼叫tryAcquireShared(arg)
方法獲取同步狀態,如果獲取成功,那麼方法執行結束,如果獲取失敗呼叫doAcquireSharedInterruptibly(arg);
方法加入同步佇列等待。這裡的tryAcquireShared(arg)
是個模板方法,AQS內部沒有提供具體實現,由子類實現,也就是有Semaphore內部自己實現,該方法在Semaphore內部非公平鎖的實現如下
//Semaphore中非公平鎖NonfairSync的tryAcquireShared()
protected int tryAcquireShared(int acquires) {
//呼叫了父類Sync中的實現方法
return nonfairTryAcquireShared(acquires);
}
//Syn類中
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
//使用死迴圈
for (;;) {
int available = getState();
int remaining = available - acquires;
//判斷訊號量是否已小於0或者CAS執行是否成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
nonfairTryAcquireShared(int acquires)
方法內部,先獲取state的值,並執行減法操作,得到remaining值,如果remaining不小於0,那麼執行緒獲取同步狀態成功,可訪問共享資源,並更新state的值,如果remaining大於0,那麼執行緒獲取同步狀態失敗,將被加入同步佇列(通過doAcquireSharedInterruptibly(arg)
),注意Semaphore的acquire()
可能存在併發操作,因此nonfairTryAcquireShared()
方法體內部採用無鎖(CAS)併發的操作保證對state值修改的安全性。如何嘗試獲取同步狀態失敗,那麼將會執行doAcquireSharedInterruptibly(int arg)
方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//建立共享模式的結點Node.SHARED,並加入同步佇列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//進入自旋操作
for (;;) {
final Node p = node.predecessor();
//判斷前驅結點是否為head
if (p == head) {
//嘗試獲取同步狀態
int r = tryAcquireShared(arg);
//如果r>0 說明獲取同步狀態成功
if (r >= 0) {
//將當前執行緒結點設定為頭結點並傳播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//調整同步佇列中node結點的狀態並判斷是否應該被掛起
//並判斷是否需要被中斷,如果中斷直接丟擲異常,當前結點請求也就結束
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
//結束該結點執行緒的請求
cancelAcquire(node);
}
}
在方法中,由於當前執行緒沒有獲取同步狀態,因此建立一個共享模式(Node.SHARED
)的結點並通過addWaiter(Node.SHARED)
加入同步佇列,加入完成後,當前執行緒進入自旋狀態,首先判斷前驅結點是否為head,如果是,那麼嘗試獲取同步狀態並返回r值,如果r大於0,則說明獲取同步狀態成功,將當前執行緒設定為head並傳播,傳播指的是,同步狀態剩餘的許可數值不為0,通知後續結點繼續獲取同步狀態,到此方法將會return結束,獲取到同步狀態的執行緒將會執行原定的任務。但如果前驅結點不為head或前驅結點為head並嘗試獲取同步狀態失敗,那麼呼叫shouldParkAfterFailedAcquire(p, node)
方法判斷前驅結點的waitStatus值是否為SIGNAL並調整同步佇列中的node結點狀態,如果返回true,那麼執行parkAndCheckInterrupt()
方法,將當前執行緒掛起並返回是否中斷執行緒的flag。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取當前結點的等待狀態
int ws = pred.waitStatus;
//如果為等待喚醒(SIGNAL)狀態則返回true
if (ws == Node.SIGNAL)
return true;
//如果ws>0 則說明是結束狀態,
//遍歷前驅結點直到找到沒有結束狀態的結點
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果ws小於0又不是SIGNAL狀態,
//則將其設定為SIGNAL狀態,代表該結點的執行緒正在等待喚醒。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//將當前執行緒掛起
LockSupport.park(this);
//獲取執行緒中斷狀態,interrupted()是判斷當前中斷狀態,
//並非中斷執行緒,因此可能true也可能false,並返回
return Thread.interrupted();
}
到此,加入同步佇列的整個過程完成。這裡小結一下,在AQS中存在一個變數state,當我們建立Semaphore物件傳入許可數值時,最終會賦值給state,state的數值代表同一個時刻可同時操作共享資料的執行緒數量,每當一個執行緒請求(如呼叫Semaphored的acquire()方法)獲取同步狀態成功,state的值將會減少1,直到state為0時,表示已沒有可用的許可數,也就是對共享資料進行操作的執行緒數已達到最大值,其他後來執行緒將被阻塞,此時AQS內部會將執行緒封裝成共享模式的Node結點,加入同步佇列中等待並開啟自旋操作。只有當持有對共享資料訪問許可權的執行緒執行完成任務並釋放同步狀態後,同步佇列中的對於的結點執行緒才有可能獲取同步狀態並被喚醒執行同步操作,注意在同步佇列中獲取到同步狀態的結點將被設定成head並清空相關執行緒資料(畢竟執行緒已在執行也就沒有必要儲存資訊了),AQS通過這種方式便實現共享鎖,簡單模型如下
前面我們分析的是可中斷的請求,與只對應的不可中的的請求(這些方法都存在於AQS,由子類Semaphore間接呼叫)如下
//不可中的acquireShared()
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//沒有丟擲異常中的。。。。
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//設定為頭結點
/*
* 嘗試去喚醒佇列中的下一個節點,如果滿足如下條件:
* 呼叫者明確表示"傳遞"(propagate > 0),
* 或者h.waitStatus為PROPAGATE(被上一個操作設定)
* 並且
* 下一個節點處於共享模式或者為null。
*
* 這兩項檢查中的保守主義可能會導致不必要的喚醒,但只有在有
* 有在多個執行緒爭取獲得/釋放同步狀態時才會發生,所以大多
* 數情況下會立馬獲得需要的訊號
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//喚醒後繼節點,因為是共享模式,所以允許多個執行緒同時獲取同步狀態
doReleaseShared();
}
}
顯然與前面帶中斷請求doAcquireSharedInterruptibly(int arg)
方法不同的是少執行緒中斷的判斷以及異常丟擲,其他操作都一樣,關於doReleaseShared()
,放後面分析。ok~,瞭解完請求同步狀態的過程,我們看看釋放請求狀態的過程,當每個執行緒執行完成任務將會釋放同步狀態,此時state值一般都會增加1。先從Semaphore的release()方法入手
//Semaphore的release()
public void release() {
sync.releaseShared(1);
}
//呼叫到AQS中的releaseShared(int arg)
public final boolean releaseShared(int arg) {
//呼叫子類Semaphore實現的tryReleaseShared方法嘗試釋放同步狀態
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
顯然Semaphore間接呼叫了AQS中的releaseShared(int arg)方法,通過tryReleaseShared(arg)
方法嘗試釋放同步狀態,如果釋放成功,那麼將呼叫doReleaseShared()
喚醒同步佇列中後繼結點的執行緒,tryReleaseShared(int releases)
方法如下
//在Semaphore的內部類Sync中實現的
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//獲取當前state
int current = getState();
//釋放狀態state增加releases
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//通過CAS更新state的值
if (compareAndSetState(current, next))
return true;
}
}
邏輯很簡單,釋放同步狀態,更新state的值,值得注意的是這裡必須操作無鎖操作,即for死迴圈和CAS操作來保證執行緒安全問題,因為可能存在多個執行緒同時釋放同步狀態的場景。釋放成功後通過doReleaseShared()
方法喚醒後繼結點。
private void doReleaseShared() {
/*
* 保證釋放動作(向同步等待佇列尾部)傳遞,即使沒有其他正在進行的
* 請求或釋放動作。如果頭節點的後繼節點需要喚醒,那麼執行喚醒
* 動作;如果不需要,將頭結點的等待狀態設定為PROPAGATE保證
* 喚醒傳遞。另外,為了防止過程中有新節點進入(佇列),這裡必
* 需做迴圈,所以,和其他unparkSuccessor方法使用方式不一樣
* 的是,如果(頭結點)等待狀態設定失敗,重新檢測。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
// 獲取頭節點對應的執行緒的狀態
int ws = h.waitStatus;
// 如果頭節點對應的執行緒是SIGNAL狀態,則意味著頭
//結點的後繼結點所對應的執行緒需要被unpark喚醒。
if (ws == Node.SIGNAL) {
// 修改頭結點對應的執行緒狀態設定為0。失敗的話,則繼續迴圈。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 喚醒頭結點h的後繼結點所對應的執行緒
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果頭結點發生變化,則繼續迴圈。否則,退出迴圈。
if (h == head) // loop if head changed
break;
}
}
//喚醒傳入結點的後繼結點對應的執行緒
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//拿到後繼結點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//喚醒該執行緒
LockSupport.unpark(s.thread);
}
顯然doReleaseShared()
方法中通過呼叫unparkSuccessor(h)
方法喚醒head的後繼結點對應的執行緒。注意這裡把head的狀態設定為Node.PROPAGATE
是為了保證喚醒傳遞,博主認為是可能同時存在多個執行緒併發爭取資源,如果執行緒A已執行到doReleaseShared()方法中,正被喚醒後正準備替換head(實際上還沒替換),而執行緒B又跑來請求資源,此時呼叫setHeadAndPropagate(Node node, int propagate)
時,傳入的propagate=0
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//喚醒後繼節點,因為是共享模式,所以允許多個執行緒同時獲取同步狀態
doReleaseShared();
}
但為了保證持續喚醒後繼結點的執行緒即doReleaseShared()
方法被呼叫,可以把head的waitStatus設定為Node.PROPAGATE
,這樣就保證執行緒B也可以執行doReleaseShared()
保證後續結點被喚醒或傳播,注意doReleaseShared()
可以同時被釋放操作和獲取操作呼叫,但目的都是為喚醒後繼節點,因為是共享模式,所以允許多個執行緒同時獲取同步狀態。ok~,釋放過程的分析到此完結,對於釋放操作的過程還是相對簡單些的,即嘗試更新state值,更新成功呼叫doReleaseShared()
方法喚醒後繼結點對應的執行緒。
公平鎖中的共享鎖
事實上公平鎖的中的共享模式實現除了在獲取同步狀態時與非公平鎖不同外,其他基本一樣,看看公平鎖的實現
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
//這裡是重點,先判斷佇列中是否有結點再執行
//同步狀態獲取。
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
從程式碼中可以看出,與非公平鎖tryAcquireShared(int acquires)
方法實現的唯一不同是,在嘗試獲取同步狀態前,先呼叫了hasQueuedPredecessors()
方法判斷同步佇列中是否存在結點,如果存在則返回-1,即將執行緒加入同步佇列等待。從而保證先到來的執行緒請求一定會先執行,也就是所謂的公平鎖。至於其他操作,與前面分析的非公平鎖一樣。
小結
ok~,到此我們通過對Semaphore的內部實現原理分析後,對共享鎖的實現有了基本的認識,即AQS中通過state值來控制對共享資源訪問的執行緒數,每當執行緒請求同步狀態成功,state值將會減1,如果超過限制數量的執行緒將被封裝共享模式的Node結點加入同步佇列等待,直到其他執行執行緒釋放同步狀態,才有機會獲得執行權,而每個執行緒執行完成任務釋放同步狀態後,state值將會增加1,這就是共享鎖的基本實現模型。至於公平鎖與非公平鎖的不同之處在於公平鎖會線上程請求同步狀態前,判斷同步佇列是否存在Node,如果存在就將請求執行緒封裝成Node結點加入同步佇列,從而保證每個執行緒獲取同步狀態都是先到先得的順序執行的。非公平鎖則是通過競爭的方式獲取,不管同步佇列是否存在Node結點,只有通過競爭獲取就可以獲取執行緒執行權。
ok~,本篇到此結束,上述原始碼解讀基於參考資料和個人的理解,如有誤處,請留言謝謝。