Java AQS 抽象的佇列式的同步器 AbstractQueuedSynchronizer 學習筆記 2022-3-23
原文地址:http://www.cnblogs.com/waterystone/p/4920797.html
AQS、抽象的佇列式的同步器
AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用ReentrantLock/Semaphore/CountDownLatch
整體架構
它維護了 一個volatile int state(代表共享資源) 和 一個FIFO執行緒等待佇列(多執行緒爭用資源被阻塞時會進入此佇列);
總結:整體架構由 volatile修飾的int型state變數 + FIFO執行緒等待佇列 組成
state的訪問方式有三種:
getState()
setState()
compareAndSetState()
AQS定義兩種資源共享方式:
(1)獨佔、Exclusive(只有一個執行緒能執行,如ReentrantLock)
(2)共享、Share(多個執行緒可同時執行,如Semaphore/CountDownLatch)
不同的自定義同步器爭用共享資源的方式也不同;自定義同步器在實現時只需要 實現共享資源state的獲取與釋放方式 即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了;
自定義同步器實現時主要實現以下幾種方法:
1、isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它
2、tryAcquire(int):獨佔方式,嘗試獲取資源;成功則返回true,失敗則返回false
3、tryRelease(int):獨佔方式,嘗試釋放資源;成功則返回true,失敗則返回false
4、tryAcquireShared(int):共享方式,嘗試獲取資源;負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源
5、tryReleaseShared(int):共享方式,嘗試釋放資源;如果釋放後允許喚醒後續等待結點返回true,否則返回false
ReentrantLock
以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A執行緒lock()時,會呼叫tryAcquire()獨佔該鎖並將state+1。此後,其他執行緒再
tryAcquire()時就會失敗,直到A執行緒unlock()到state=0(即釋放鎖)為止,其它執行緒才有機會獲取該鎖。當然,釋放鎖之前,A執行緒自己是可以重複獲取此鎖的(state會累加),這就是可重入的概念;但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的
CountDownLatch
以CountDownLatch以例,任務分為N個子執行緒去執行,state也初始化為N(注意N要與執行緒個數一致)。這N個子執行緒是並行執行的,每個子執行緒執行完後countDown()一次,state會CAS減1。等到所有子執行緒都執行完後(即state=0),會unpark()主呼叫執行緒,然後主呼叫執行緒就會從await()函式返回,繼續後餘動作。
總結
一般來說,自定義同步器 要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可;但AQS也支援自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock
原始碼學習
結點狀態waitStatus
Node結點是對每一個等待獲取資源的執行緒的封裝,其包含了需要同步的執行緒本身及其等待狀態,如是否被阻塞、是否等待喚醒、是否已經被取消等。
變數waitStatus則表示當前Node結點的等待狀態,共有5種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。
(1)CANCELLED(1):表示當前結點已取消排程。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀態,進入該狀態後的結點將不會再變化
(2)SIGNAL(-1):表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新為SIGNAL
(3)CONDITION(-2):表示結點等待在Condition上,當其他執行緒呼叫了Condition的signal()方法後,CONDITION狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖
(4)PROPAGATE(-3):共享模式下,前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點
(5)0:新結點入隊時的預設狀態
注意,負值表示結點處於有效等待狀態,而正值表示結點已被取消。所以原始碼中很多地方用>0、<0來判斷結點的狀態是否正常
acquire(int)
此方法是獨佔模式下執行緒獲取共享資源的頂層入口;如果獲取到資源,執行緒直接返回,否則進入等待佇列,直到獲取到資源為止,且整個過程忽略中斷的影響;這也正是lock()的語義,當然不僅僅只限於lock();獲取到資源後,執行緒就可以去執行其臨界區程式碼了。下面是acquire()的原始碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
函式流程如下
1、tryAcquire():嘗試直接去獲取資源,如果成功則直接返回(這裡體現了非公平鎖,每個執行緒獲取鎖時會嘗試直接搶佔加塞一次,而CLH佇列中可能還有別的執行緒在等待);
2、addWaiter():將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
3、acquireQueued():使執行緒阻塞在等待佇列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
如果執行緒在等待過程中被中斷過,它是不響應的;只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上
tryAcquire(int)
此方法嘗試去獲取獨佔資源;如果獲取成功,則直接返回true,否則直接返回false;這也正是tryLock()的語義,還是那句話,當然不僅僅只限於tryLock();如下是tryAcquire()的原始碼:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
什麼?直接throw異常?說好的功能呢?
好吧,還記得概述裡講的AQS只是一個框架,具體資源的獲取/釋放方式交由自定義同步器去實現嗎?就是這裡了!!!
AQS這裡只定義了一個介面,具體資源的獲取交由自定義同步器去實現了(通過state的get/set/CAS)!!!
至於能不能重入,能不能加塞,那就看具體的自定義同步器怎麼去設計了!!!
當然,自定義同步器在進行資源訪問時要考慮執行緒安全的影響
這裡之所以沒有定義成abstract,是因為獨佔模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實現另一模式下的介面。
說到底,Doug Lea還是站在咱們開發者的角度,儘量減少不必要的工作量。
addWaiter(Node)
此方法用於將當前執行緒加入到等待佇列的隊尾,並返回當前執行緒所在的結點
private Node addWaiter(Node mode) {
//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速方式直接放到隊尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失敗則通過enq入隊。
enq(node);
return node;
}
enq(Node):此方法用於將node加入隊尾
private Node enq(final Node node) {
//CAS"自旋",直到成功加入隊尾
for (;;) {
Node t = tail;
if (t == null) { // 佇列為空,建立一個空的標誌結點作為head結點,並將tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果你看過AtomicInteger.getAndIncrement()函式原始碼,那麼相信你一眼便看出這段程式碼的精華。CAS自旋volatile變數,是一種很經典的用法
......t太多了,還是慢慢看吧
https://www.cnblogs.com/waterystone/p/4920797.html