原始碼分析AQS獨佔鎖、共享鎖和Condition的實現
AbstractQueuedSynchronizer
是java.util.concurrent
包下非常重要和基礎的類,concurrent包下提供了一系列的同步操作需要的工具,包括了ReentrantLock
、ReentrantReadWriteLock
、ThreadPoolExecutor
、CountDownLatch
、LinkedBlockingQueue
等等,該包下很多的工具類都直接或者間接基於AQS提供的獨佔鎖、共享鎖和等待佇列實現了各自的同步需求。
一、AQS的設計:①AQS將資源抽象成一個int型的值:int state
,通過對state的修改,達到對AQS不同狀態的控制,對state變數的修改由其子類實現;②並不是state>0才代表有資源可以申請,如ReentrantLock
acquire(int arg)、release(int arg)
,分別用來獲取和釋放一定量的資源,即增大和減小state的值。當執行緒執行上述兩個方法時,AQS的子類嘗試修改state的值;④在acquire()
方法中:若state大小符合特定需求(具體邏輯由子類實現),則執行緒會鎖定同步器,否則將當前執行緒加入到同步佇列中;在release()
方法中:若state大小符合特定需求,則釋放掉當前執行緒佔有的資源,喚醒同步佇列中的執行緒。
二、通過ReentrantLock
ReentrantLock
中對state的設計為:state==0的時候代表沒有執行緒持有同步器,在state>0的時候,其它執行緒是不能獲取同步器的,必須加入到同步佇列中等待。
1、從lock()
方法開始:這裡只分析NonfairSync
static final class NonfairSync extends Sync {
//從這個方法開始申請鎖
final void lock() {
//當前state的值為 0,執行緒直接獲取到鎖,setExclusiveOwnerThread()方法可以設定當前持有鎖的執行緒
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//說明已經有執行緒持有了鎖,則acquire()申請資源,acquire()函式的作用就是:
//首先呼叫tryAcquire(),嘗試獲取資源,在此的具體實現就是nonfairTryAcquire()方法
//獲取資源失敗則將執行緒加入到同步佇列中等待
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
//嘗試獲取資源的地方
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//首先還是判斷是否有執行緒已經持有鎖(c=0),還未有執行緒持有鎖則讓該執行緒持有鎖,設定state的值為acquires,返回true
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//已經有執行緒持有鎖,判斷持有鎖的執行緒和申請鎖的執行緒是否是同一個執行緒,如果是,讓state值增加acquires,返回true
//也就是同一個執行緒可以重複獲取到同步器,從這裡可以看出為什麼ReentrantLock鎖是可重入的了
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//不是上面兩種情況,即獲取資源失敗,返回false
return false;
//判斷獲取到鎖的是不是當前執行緒
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
//獲取Condition例項,ConditionObject的定義在AQS中
return new ConditionObject();
}
final Thread getOwner() {
//獲取當前持有鎖的執行緒,state值為 0,代表還未有執行緒持有鎖
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
//state不為0,就代表有執行緒持有鎖了
final boolean isLocked() {
return getState() != 0;
}
}
2、Node
類:是同步佇列中節點的描述,用來儲存等待的執行緒、狀態、前驅和後繼節點等的資訊。
static final class Node {
//標識共享模式的節點,共享模式下Node節點的nextWaiter變數設定為這個值
static final Node SHARED = new Node();
//標識獨佔模式的節點,獨佔模式nextWaiter變數是null
static final Node EXCLUSIVE = null;
//同步佇列中被取消的節點,被中斷或者等待超時,該狀態的節點不再被使用
static final int CANCELLED = 1;
//標識後繼節點處於被喚醒的狀態,當節點釋放同步器後,會喚醒後繼節點中第一個處於該狀態的節點
static final int SIGNAL = -1;
//描述處於等待佇列中的節點,節點中的執行緒等待在Condition上,
//在呼叫signal()之後,被喚醒的節點將從等待佇列中轉移到同步佇列中繼續等待
static final int CONDITION = -2;
//確保共享模式下可以喚醒後續的共享節點
static final int PROPAGATE = -3;
//儲存節點狀態,值為 0、CANCELLED、SIGNAL、CONDITION、PROPAGATE
volatile int waitStatus;
//連結串列的上一個節點
volatile Node prev;
//連結串列的下一個節點
volatile Node next;
//儲存被阻塞的執行緒
volatile Thread thread;
//用來儲存某個Condition上的等待佇列,也用來判斷節點是否是共享節點
Node nextWaiter;
//判斷節點是否是共享模式,通過判斷nextWaiter==SHARED
final boolean isShared() {
return nextWaiter == SHARED;
}
//找到前驅節點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
U.putObject(this, THREAD, Thread.currentThread());
}
Node(int waitStatus) {
U.putInt(this, WAITSTATUS, waitStatus);
U.putObject(this, THREAD, Thread.currentThread());
}
/** CAS設定節點的狀態 */
final boolean compareAndSetWaitStatus(int expect, int update) {
return U.compareAndSwapInt(this, WAITSTATUS, expect, update);
}
/** CAS設定後繼節點 */
final boolean compareAndSetNext(Node expect, Node update) {
return U.compareAndSwapObject(this, NEXT, expect, update);
}
//Unsafe 是CAS的工具類,CAS是虛擬機器實現的原子性操作
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long NEXT;
static final long PREV;
private static final long THREAD;
private static final long WAITSTATUS;
static {
try {
//分別拿到Node節點的next、prev、thread、waitStatus變數的控制代碼,CAS通過控制代碼修改這些變數
NEXT = U.objectFieldOffset
(Node.class.getDeclaredField("next"));
PREV = U.objectFieldOffset
(Node.class.getDeclaredField("prev"));
THREAD = U.objectFieldOffset
(Node.class.getDeclaredField("thread"));
WAITSTATUS = U.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
3、AQS的acquire(int arg)
方法:從NonfairSync
的實現看出,申請鎖(資源)的時候,首先會執行acquire()
方法。
①acquire()
方法,嘗試獲取資源,失敗將執行緒加入同步佇列中
public final void acquire(int arg) {
//tryAcquire()嘗試獲取資源,返回false,執行acquireQueued()方法,將執行緒加入同步佇列中,讓執行緒進入等待狀態
//addWaiter()這裡傳入的是獨佔標誌(Node.EXCLUSIVE),說明該節點是一個獨佔節點
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
②addWaiter()
方法,將當前執行緒封裝進Node節點,並將節點加入到同步佇列中,如果頭結點為null,則初始化頭結點和尾節點,並將當前節點連結在尾節點之後。
private Node addWaiter(Node mode) {
//初始化一個Node節點,變數nextWaiter為mode
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
//將當前節點設定成尾節點,連結在之前的尾節點之後
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
//初始化頭節點和尾節點指向同一個節點
initializeSyncQueue();
}
}
}
③acquireQueued()
方法,自旋直到執行緒持有同步器,這裡執行緒將進入等待狀態,直到其它執行緒釋放資源,喚醒處於隊首的執行緒,喚醒後該執行緒必須要繼續獲取到資源
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
//自旋,直到執行緒持有同步器,(tryAcquire()成功)
for (;;) {
final Node p = node.predecessor();
//該節點的前驅是頭節點,說明喚醒順序先進先出的
//嘗試獲取資源,成功則返回interrupted,讓該執行緒持有同步器
if (p == head && tryAcquire(arg)) {
setHead(node); //設定節點為頭節點
p.next = null; // 幫助釋放記憶體
return interrupted;
}
//檢查節點狀態,如果可以進入等待狀態,則讓執行緒進入等待狀態
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
④shouldParkAfterFailedAcquire()
方法,設定節點的狀態,要為node節點找到一個符合需求的前驅節點,並設定其狀態為SIGNAL,表明node節點是一個在等待狀態的正常的節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//前驅節點的waitStatus為SIGNAL,說明後繼節點可以進入等待狀態
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//前驅節點是CANCELLED,則跳過這些節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驅節點狀態正常的情況下,設定狀態為SIGNAL,標誌後繼節點可以進入等待狀態
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
⑤parkAndCheckInterrupt()
方法:藉助LockSupport
將當前執行緒阻塞,在喚醒之後返回中斷狀態,因為在阻塞狀態下,執行緒不響應中斷,在喚醒之後,如果有需要則重新處理該中斷。
private final boolean parkAndCheckInterrupt() {
//LockSupport中的park() 和 unpark() 的作用分別是阻塞執行緒和解除阻塞執行緒
//其它說明可百度
LockSupport.park(this);
return Thread.interrupted();
}
4、AQS的release(int arg)
方法:acquire()
方法完成了資源獲取成功後持有同步器,獲取失敗後加入到同步佇列中,並將執行緒阻塞等一系列操作。下面分析同步佇列執行緒被喚醒的過程。
①ReentrantLock
的unlock()
方法只調用了release(1)
方法:tryRelease()
由子類實現,用來判斷是否可以喚醒執行緒。如果可以則呼叫unparkSuccessor()
喚醒一個執行緒
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//這裡傳入的是頭結點,也就是說喚醒的是頭結點的後繼節點
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
②ReentrantLock
的tryRelease()
方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//state==0才能喚醒同步佇列中的執行緒
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//重新設定state的值
setState(c);
return free;
}
③ unparkSuccessor(Node node)
方法:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
//找到需要被喚醒的節點
Node s = node.next;
//如果當前節點的後繼節點不符合要求(為null或者被取消了),則從尾節點向前查詢
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
//喚醒該執行緒,執行緒的阻塞是在acquireQueued()方法中,執行緒將從阻塞的地方繼續執行
if (s != null)
LockSupport.unpark(s.thread);
}
三、通過CountDownLatch
分析共享鎖:CountDownLatch
在釋放同步器之後會一次喚醒所有等待在同步佇列上的執行緒。主要功能是:允許一個或者多個執行緒等待其他執行緒完成任務,然後這些等待的執行緒同時被喚醒,使用很簡單,下面主要分析共享鎖的實現,在ReentrantLock
中分析過的函式不再分析。
1、CountDownLatch
中的自定義同步器:
//CountDownLatch的建構函式,傳入了一個int值,作為state的初始值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
//設定state的初始值為自己傳入的引數
setState(count);
}
int getCount() {
return getState();
}
//只有state為 0的時候,也就是其它執行緒全部執行完成後,才算獲取資源成功,返回1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
//只有在state==1的時候才會返回true,進而釋放同步器,喚醒等待的執行緒
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
2、CountDownLatch
中的await()
方法:獲取資源,只調用了AQS的acquireSharedInterruptibly()
方法,該方法會丟擲中斷異常,在呼叫await()
時要捕獲異常。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//嘗試獲取資源,返回int型值,tryAcquire()返回bool值
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
3、doAcquireSharedInterruptibly()
方法:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//將一個共享節點新增到同步佇列中
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
//嘗試獲取資源成功後(r>=0),setHeadAndPropagate()將head節點指向node,然後喚醒同步佇列中的共享節點
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
//修改節點的狀態,阻塞執行緒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果等待過程中產生了中斷,則丟擲中斷異常
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
4、setHeadAndPropagate()
方法:設定頭結點為node節點,並嘗試喚醒後續的共享節點
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;