談談AQS(一)- 同步元件的基石
阿新 • • 發佈:2019-12-31
AbstractQueuedSynchronizer-最基礎的同步器模板
它是一個框架,一個模板
AQS只是一個框架,具體資源的獲取/釋放方式(tryAcquire/tryRealease)交由自定義同步器去實現,是一個抽象的模板類,體現了模板方法的設計模式。
核心變數state
/**
* The synchronization state.
*/
private volatile int state;
複製程式碼
AQS甚至於concurrent包中的所有類,都是在對volatile變數讀寫結合CAS操作的基礎上實現的。
可以重寫的方法(沒有限定final):
//獨佔式獲取同步狀態,需要CAS更新同步狀態
protected boolean tryAcquire(int arg);
//獨佔式釋放同步狀態
protected boolean tryRealease(int arg);
//共享式獲取同步狀態,返回值大於0表示成功,反之失敗
protected boolean tryAcquireShared(int arg);
//共享式釋放同步狀態
protected boolean tryRealeaseShared(int arg);
//當前同步器是否在獨佔模式下被執行緒佔用
protected boolean isHeldExclusively();
複製程式碼
- 這些方法都沒有直接宣告成abstract,而是直接丟擲異常。也就意味著不是每個子類都必須實現這些方法,比如獨佔鎖就完全不需要去實現和它不相關的shared方法。
不可重寫的方法(直接宣告final):
public final void acquire(int arg);
public final void acquireInterruptibly(int arg);
public final void acquireShared(int arg);
public final void acquireSharedInterruptibly(int arg);
public final boolean tryAcquireNanos(int arg,long nanos);
public final boolean tryAcquireSharedNanos (int arg,long nanos);
public final boolean release(int arg);
public final boolean releaseShared(int arg);
public final Collection<Thread> getQueueThreads();
複製程式碼
獨佔式的同步狀態獲取流程
首先呼叫acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
複製程式碼
-
tryAcquire
成功則結束,直接返回,可以開始執行臨界區的程式碼 - 否則,要將當前的執行緒構建成一個
Node.EXCLUSIVE
模式的節點,加入到佇列的末尾,並且呼叫acquireQueued
方法,讓這個節點進入自旋狀態。也就是說,這個節點加入同步佇列後,就會死迴圈的去獲取同步狀態,從而阻塞在了臨界區的外面。該執行緒的喚醒主要依靠前驅結點的出隊或者阻塞執行緒被中斷。
private Node addWaiter(Node mode) {
/**
* 以一個Node類的靜態變數Node.EXCLUSIVE/Node.SHARED
* 作為mode標誌節點的模式
* mode節點會被設定成nextWaiter
* 所以nextWaiter可用來判斷節點的兩種模式
*/
Node node = new Node(Thread.currentThread(),mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//保證多個執行緒同時操作時執行緒安全
if (compareAndSetTail(pred,node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
複製程式碼
final boolean acquireQueued(final Node node,int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋獲取同步狀態
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p,node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製程式碼
獨佔式同步狀態的釋放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製程式碼
獨佔式超時獲取同步狀態流程
超時獲取同步狀態原理比較類似。
private boolean doAcquireNanos(int arg,long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p,node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this,nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製程式碼
共享式同步狀態的獲取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
複製程式碼
雖然是共享式的獲取同步狀態,但也不意味著這個同步狀態可以被無限的獲取,還是有一個限制的。例如Semaphore中的permits即是一個限制。
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available,remaining))
return remaining;
}
}
複製程式碼
當remaining < 0 時,就會觸發tryAcquireShared(arg) < 0, 也就是嘗試共享式的獲取失敗。 那麼這時候也會像獨佔式時那樣,加入同步佇列,自旋的去獲取同步狀態。也就是超過state允許的最大值後,也不能去執行臨界區的程式碼。 唯一的區別就是,將當前執行緒封裝成了一個Node.SHARED模式的節點。在自旋狀態中嘗試獲取成功後,會重新設定頭節點並且將這個影響藉助連結串列中的節點以及節點的waitStatus傳播下去。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node,r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p,node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製程式碼
LockSupport工具包
以上方法中關於對執行緒的阻塞和喚醒主要通過LockSupport這個包中的幾個方法來實現。
- void park() : 阻塞當前執行緒,執行緒不再參與處理器排程,如果呼叫unpark方法或者當前執行緒被中斷,才能從park()中返回。
- void parkNanos(long nanos): 增加超時返回的功能。
- void parkUntil(long deadline):將執行緒阻塞到deadline時間。
Doug Lea 為了演示這個類的基本功能,不借助aqs實現了一個先進先出的互斥鎖。
class FIFOMutex {
private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
waiters.add(current);
// Block while not first in queue or cannot acquire lock
while (waiters.peek() != current ||
!locked.compareAndSet(false,true)) {
LockSupport.park(this);
if (Thread.interrupted()) // ignore interrupts while waiting
wasInterrupted = true;
}
waiters.remove();
if (wasInterrupted) // reassert interrupt status on exit
current.interrupt();
}
public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
複製程式碼