ReentrantLock原始碼探究1:非公平鎖的獲取和釋放
阿新 • • 發佈:2019-01-14
1.AQS簡單介紹
Sync是ReentrantLock的一個內部類,它繼承了AbstractQueuedSynchronizer,即AQS,在CountDownLatch、FutureTask、Semaphore、ReentrantLock等原始碼中,我們都能看到它們的身影,足見其重要性。此處我們需要先了解下AQS才能更愉悅地閱讀原始碼。
AQS中是基於FIFO佇列的實現,那麼它必然包含佇列中元素的定義,在這裡它是Node:
屬 性 | 定 義 |
---|---|
Node SHARED = new Node() | 表示Node處於共享模式 |
Node EXCLUSIVE = null | 表示Node處於獨佔模式 |
int CANCELLED = 1 | 因為超時或者中斷,Node被設定為取消狀態,被取消的Node不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態,處於這種狀態的Node會被踢出佇列,被GC回收 |
int SIGNAL = -1 | 表示這個Node的繼任Node被阻塞了,到時需要通知它 |
int CONDITION = -2 | 表示這個Node在條件佇列中,因為等待某個條件而被阻塞 |
int PROPAGATE = -3 | 使用在共享模式頭Node有可能處於這種狀態, 表示鎖的下一次獲取可以無條件傳播 |
int waitStatus | 0,新Node會處於這種狀態 |
Node prev | 佇列中某個Node的前驅Node |
Node next | 佇列中某個Node的後繼Node |
Thread thread | 這個Node持有的執行緒,表示等待鎖的執行緒 |
Node nextWaiter | 表示下一個等待condition的Node |
AQS中包含的方法有
屬性/方法 | 含 義 |
---|---|
Thread exclusiveOwnerThread | 這個是AQS父類AbstractOwnableSynchronizer的屬性,表示獨佔模式同步器的當前擁有者 |
Node | 上面已經介紹過了,FIFO佇列的基本單位 |
Node head | FIFO佇列中的頭Node |
Node tail | FIFO佇列中的尾Node |
int state | 同步狀態,0表示未鎖 |
int getState() | 獲取同步狀態 |
setState(int newState) | 設定同步狀態 |
boolean compareAndSetState(int expect, int update) | 利用CAS進行State的設定 |
long spinForTimeoutThreshold = 1000L | 執行緒自旋等待的時間 |
Node enq(final Node node) | 插入一個Node到FIFO佇列中 |
Node addWaiter(Node mode) | 為當前執行緒和指定模式建立並擴充一個等待佇列 |
void setHead(Node node) | 設定佇列的頭Node |
void unparkSuccessor(Node node) | 如果存在的話,喚起Node持有的執行緒 |
void doReleaseShared() | 共享模式下做釋放鎖的動作 |
void cancelAcquire(Node node) | 取消正在進行的Node獲取鎖的嘗試 |
boolean shouldParkAfterFailedAcquire(Node pred, Node node) | 在嘗試獲取鎖失敗後是否應該禁用當前執行緒並等待 |
void selfInterrupt() | 中斷當前執行緒本身 |
boolean parkAndCheckInterrupt() | 禁用當前執行緒進入等待狀態並中斷執行緒本身 |
boolean acquireQueued(final Node node, int arg) | 佇列中的執行緒獲取鎖 |
tryAcquire(int arg) | 嘗試獲得鎖(由AQS的子類實現它) |
tryRelease(int arg) | 嘗試釋放鎖(由AQS的子類實現它) |
isHeldExclusively() | 是否獨自持有鎖 |
acquire(int arg) | 獲取鎖 |
release(int arg) | 釋放鎖 |
compareAndSetHead(Node update) | 利用CAS設定頭Node |
compareAndSetTail(Node expect, Node update) | 利用CAS設定尾Node |
compareAndSetWaitStatus(Node node, int expect, int update) | 利用CAS設定某個Node中的等待狀態 |
另外在原始碼中多處使用了CAS,有關CAS的內容,可檢視:
樂觀鎖的一種實現方式:CAS
2.非公平鎖的獲取過程
假設有兩個執行緒:執行緒1和執行緒2嘗試獲取同一個鎖(非公平鎖),過程如下
- 執行緒1呼叫lock方法
final void lock() {
if (compareAndSetState(0, 1)) //使用CAS將同步狀態設定為1
setExclusiveOwnerThread(Thread.currentThread());//成功則設定執行緒1為當前鎖的獨佔執行緒
else
acquire(1); //設定失敗時,嘗試獲取鎖
}
//上述程式碼正常情況下執行完畢後,執行緒1成為了獨佔執行緒。
- 執行緒2此時也嘗試獲取鎖,呼叫lock方法,此時CAS設定時會失敗,進入acquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); //重新獲取鎖失敗且執行緒發生了中斷,自行中斷
}
- 這裡面,會首先呼叫tryAcquire方法嘗試再次獲取鎖,因為我們演示的是非公平鎖,因此呼叫的方法是nonfairTryAcquire。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //current指向當前執行緒2
int c = getState(); //若執行緒1未釋放鎖,則c>0,若執行緒1已經釋放鎖,則c=0
if (c == 0) { //執行緒1已經釋放了鎖
if (compareAndSetState(0, acquires)) { //使用CAS將state設定為1
setExclusiveOwnerThread(current); //並設定執行緒2為獨佔執行緒
return true; //返回true,獲取鎖成功
}
}
//判斷該執行緒是否是重入,即之前已經獲取到了鎖
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; //每重入一次,將state+1。
if (nextc < 0) // overflow //state+1<0,說明原state為負數,丟擲異常
throw new Error("Maximum lock count exceeded");
setState(nextc); //設定state為新值
return true; //返回true,獲取重入鎖成功。
}
return false; //返回flase,獲取鎖失敗
}
- 此時執行緒2使用tryAcquire方法獲取鎖,如果也是失敗,那麼,會呼叫addWaiter(Node.EXCLUSIVE)方法
private Node addWaiter(Node mode) { //此處mode為獨佔模式
Node node = new Node(Thread.currentThread(), mode);//將當前執行緒(此處為執行緒2)繫結到新節點node上,並設定為獨佔模式
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; //獲取原佇列的尾節點pred
if (pred != null) { //若原尾節點pred非空,則說明已經存在一個佇列
node.prev = pred; //設定新節點node的前置為pred
if (compareAndSetTail(pred, node)) {//使用CAS設定新的尾節點為node
pred.next = node; //設定pred的後置為node,建立雙向連結
return node; //返回node
}
}
enq(node); //進入此處說明原佇列不存在,需要初始化佇列
return node;
}
private Node enq(final Node node) { //此處傳入的引數node是綁定了執行緒2的節點
for (;;) {
Node t = tail; //獲取原佇列的尾節點t
if (t == null) { // Must initialize //若尾節點為空,說明佇列尚未形成
if (compareAndSetHead(new Node())) //設定一個空的,未繫結任何執行緒的節點為新佇列的頭節點
tail = head; //新佇列只有一個節點,既是頭也是尾
} else { //若t非空,說明佇列已經形成
node.prev = t; //將node的前置設為t
if (compareAndSetTail(t, node)) { //CAS設定新的尾節點為node
t.next = node; //設定t的後繼為node,建立雙向連結
return t; //返回t
}
}
}
}
- 現在看外層方法acquireQueued,此時傳入的引數node是執行緒2所在節點,該方法的作用是在等待佇列中,當有其他執行緒釋放了資源,那麼佇列中在等待的執行緒就可以開始行動
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //是否獲取到資源
try {
boolean interrupted = false; //等待過程中是否被中斷
//自旋,維護等待佇列中執行緒的執行。
for (;;) {
final Node p = node.predecessor(); //獲取node的前置p
if (p == head && tryAcquire(arg)) { //若前置p為頭結點並且重新獲取鎖成功
setHead(node); //設定新的頭節點為node
p.next = null; // help GC //取消p和連結串列的連結
failed = false; //獲取資源未失敗
return interrupted; //等待過程未被中斷
}
if (shouldParkAfterFailedAcquire(p, node) && //若前置節點是Node.SIGNAL狀態
parkAndCheckInterrupt()) //將節點設定為Waitting狀態
interrupted = true; //此時執行緒中斷狀態為true
}
} finally {
if (failed) //如果獲取資源成功那麼取消獲取過程
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //獲取前置節點的等待狀態
if (ws == Node.SIGNAL) //Node.SIGNAL表示繼任者執行緒需要被喚醒,那麼就可以直接返回;
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { //若ws>0,說明前驅被取消,那麼執行迴圈往前一直查詢,知道找到未被取消的,將node排在它的後面。
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //進入else,說明ws=0或者Node.PROPAGATE
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//使用CAS設定前置的節點狀態為Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
該部分程式碼可以用現實中排隊辦理業務的情況來說明:
假設你排隊去辦理業務,隊伍很長,因此除了當前正在辦理業務的人,其他所有排隊的人都在低頭玩手機,且每個排隊的人有以下三種狀態:①.正常排隊,且辦完業務後會通知後面的人別玩手機了可以開始辦理業務了。②.發現隊伍過長,不排隊了,走了。③.正常排隊,辦完業務後不通知後面的人,直接走。
此時你進入該隊伍的尾部開始排隊。
1.第一步,判斷排隊在你前面的人是否會通知你,如果會通知,那麼我們就可以不用關心其他問題,在佇列中待著玩手機即可。
2.第二步,如果發現排在你前面的人不排隊了,要走了,那麼此時我們就得往前走一位,並開始不斷詢問前面的人是不是也準備不排隊了,直到我們排在了一個確定不會走的人後面。
3.第三步,排在你前面的人不是準備走的,但是他也不會通知你,那麼你就要告訴他,一定得在辦完業務後通知你。
當我們確定我們已經在佇列中待好後(前置會通知我們),那麼我們就可以開始休息。parkAndCheckInterrupt方法讓我們的執行緒進入等待的狀態,即休息狀態。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //呼叫park()使執行緒進入waiting狀態
return Thread.interrupted(); //如果被喚醒,檢視自己是不是被中斷的。
}
3.鎖的釋放過程
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { //若tryRelease後無人佔用鎖
Node h = head; //獲取佇列的頭結點h
if (h != null && h.waitStatus != 0) //若h非空,且h的waitStatus不為0
unparkSuccessor(h); //喚醒後繼
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //當前state-1,得到c
if (Thread.currentThread() != getExclusiveOwnerThread()) //執行releas的不是獲取鎖的獨佔執行緒,丟擲異常
throw new IllegalMonitorStateException();
boolean free = false; //free用來標記鎖是否可獲取狀態
if (c == 0) { //若state=0
free = true; //那麼當前鎖是可獲取的
setExclusiveOwnerThread(null); //設定當前鎖的獨佔執行緒為null
}
setState(c); //設定當前state為c
return free; //返回鎖是否是可獲取狀態
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; //獲取當前執行緒對應節點的waitStatus
if (ws < 0) //將當前執行緒對應節點waitStatus置為0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; //獲取當前執行緒對應節點的後繼節點s
if (s == null || s.waitStatus > 0) { //若s為空或s的狀態是canceled
s = null; //將s設定為null。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) //此處從尾到頭進行遍歷,找到佇列最前列的節點且狀態不是Canceled,將其設定為s。但此處為何從尾部開始遍歷尚未弄清楚。
s = t;
}
if (s != null) //若上述遍歷找到的s非空
LockSupport.unpark(s.thread); //呼叫lockSupport.unpark喚醒s對應的執行緒
}
release方法的邏輯仍然可以用一個辦理完業務的人的後續動作來進行說明:
1.若A辦理業務後無其他業務需要辦理,那麼表示當前業務視窗是free的。
2.A將自己的等待狀態置為0,相當於退出佇列。然後檢查自己後面的人是否是空或者取消排隊的狀態。若為真,將後置設為空。
3.從佇列的尾部遍歷到頭部,直到找到佇列最前頭的那個,且它的等待狀態不是取消狀態,那麼將其喚醒,告知他可以開始辦理業務了。
4.關於原始碼的一點疑問
本文中部分原始碼本人暫時也尚未能理解,希望各位大佬不吝賜教,主要有以下一些問題:
1.在unparkSuccessor方法中,找到佇列下一個節點並將其喚醒時,為什麼從尾到頭遍歷
if (s == null || s.waitStatus > 0) { //若s為空或s的狀態是canceled
s = null; //將s設定為null。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) //倒序遍歷?
s = t;
}
2.在acquireQueued方法中,自旋結束後的finally程式碼塊的作用。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //是否獲取到資源
try {
boolean interrupted = false; //等待過程中是否被中斷
//自旋,維護等待佇列中執行緒的執行。
for (;;) {
final Node p = node.predecessor(); //獲取node的前置p
if (p == head && tryAcquire(arg)) { //若前置p為頭結點並且重新獲取鎖成功
setHead(node); //設定新的頭節點為node
p.next = null; // help GC //取消p和連結串列的連結
failed = false; //獲取資源未失敗
return interrupted; //等待過程未被中斷
}
if (shouldParkAfterFailedAcquire(p, node) && //若前置節點是Node.SIGNAL狀態
parkAndCheckInterrupt()) //將節點設定為Waitting狀態
interrupted = true; //此時執行緒中斷狀態為true
}
} finally {
if (failed) //如果自旋結束,那麼說明failed = false已經執行了,那麼這個canclAcquire方法什麼情況下會執行?
cancelAcquire(node);
}
}