J.U.C--locks--AQS分析
看一下AbstractQueuedSynchronizer(下面簡稱AQS)的子類就行知道,J.U.C中宣傳的封裝良好的同步工具類Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock、FutureTask等盡管各自都有不同特征,可是其內部的實現都與AQS分不開。
所以分析AQS的實現原理對其余顯示鎖或則同步工具類的理解非常重要。
1.主要屬性和內部類
這一篇blog主要分析AQS的實現中的重要屬性和內部類。尤其是對於ReentrantLock和ReentrantReadWriteLock。其lock()方法和unlock()方法的實現終於都是由AQS同步器實現的。由此可見分析AQS類的重要性可見一斑。
在AQS中,我們先看屬性遠比看方法來的更加easy理解這個類的作用。首先看AQS類的主要屬性:
//等待隊列的頭指針
private transient volatile Node head;
//等待隊列的尾指針
private transient volatile Node tail;
//同步器的狀態位,註意這裏state是聲明了volatile。保證了可視性
private volatile int state;
凝視事實上已經告訴我們了。Node類型的 head 和 tail 是一個FIFO的wait queue。一個int類型的狀態位state。到這裏也能猜到AQS對外呈現(或者說聲明)的主要行為就是由一個狀態位和一個有序隊列來配合完畢。
state屬性
對於state狀態的管理,在AQS中僅僅通過三個方法來實現:
java.util.concurrent.locks.AbstractQueuedSynchronizer.getState();
java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int);
java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int);
前面兩個函數事實上就是get和set方法。
第三個函數事實上是通過Unsafe類實現CAS設置狀態值,CAS+volatile 保證了state變量的線程安全。
Node結點
前面還提到了同步器的實現還依賴於一個FIFO的隊列。隊列中的元素Node就是保存著線程引用和線程狀態的容器,每一個線程對同步器的訪問。都可以看做是隊列中的一個節點。
Node類的源代碼不多,我直接所有粘貼出來:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/** 構造器 */
Node() { // Used to establish initial head or SHARED marker
}
/** 構造器 */
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
/** 構造器 */
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Node類主要有5個屬性:
volatile int waitStatus;//
volatile Node prev;//
volatile Node next;//
volatile Thread thread;//
Node nextWaiter;//
以上五個成員變量主要負責保存該節點的線程引用,同步等待隊列(下面簡稱sync隊列)的前驅和後繼節點。同一時候也包括了同步狀態。
對這5個變量的解釋例如以下:
屬性名稱 | 描寫敘述 |
---|---|
int waitStatus | 表示節點的狀態。當中包括的狀態有: 1.CANCELLED,值為1,表示當前的線程被取消。 2.SIGNAL,值為-1。表示當前節點的後繼節點包括的線程須要執行,也就是unpark; 3.CONDITION,值為-2。表示當前節點在等待condition,也就是在condition隊列中; 4.PROPAGATE,值為-3。表示當前場景下興許的acquireShared可以得以執行; 5.值為0,表示當前節點在sync隊列中。等待著獲取鎖。 |
Node prev | 前驅節點。比方當前節點被取消,那就須要前驅節點和後繼節點來完畢連接。 |
Node next | 後繼節點。 |
Node nextWaiter | 存儲condition隊列中的後繼節點。 |
Thread thread | 入隊列時的當前線程。 |
節點成為sync隊列和condition隊列構建的基礎,在同步器中就包括了sync隊列。
同步器擁有三個成員變量:sync隊列的頭結點head、sync隊列的尾節點tail和狀態state。對於鎖的獲取。請求形成節點,將其掛載在尾部。而鎖資源的轉移(釋放再獲取)是從頭部開始向後進行。對於同步器維護的狀態state,多個線程對其的獲取將會產生一個鏈式的結構。
2.重要函數的源代碼解析
獲取鎖相關函數
acquire(int arg);//以獨占模式獲取對象,忽略中斷。
acquireInterruptibly(int arg);//以獨占模式獲取對象。假設被中斷則中止。
acquireShared(int arg);//以共享模式獲取對象,忽略中斷。
acquireSharedInterruptibly(int arg);//以共享模式獲取對象,假設被中斷則中止。
tryAcquire(int arg);//試圖在獨占模式下獲取對象狀態。
tryAcquireNanos(int arg, long nanosTimeout);//試圖以獨占模式獲取對象,假設被中斷則中止。假設到了給定超時時間,則會失敗。
tryAcquireShared(int arg);//試圖在共享模式下獲取對象狀態。
tryAcquireSharedNanos(int arg, long nanosTimeout);//試圖以共享模式獲取對象,假設被中斷則中止。假設到了給定超時時間。則會失敗。
釋放鎖相關函數
release(int arg);//以獨占模式釋放對象。
releaseShared(int arg);//以共享模式釋放對象
tryRelease(int arg);//試圖設置狀態來反映獨占模式下的一個釋放。
tryReleaseShared(int arg);//試圖設置狀態來反映共享模式下的一個釋放。
1)acquire(int arg)函數
首先看看Javadoc的定義:
以獨占模式獲取對象,忽略中斷。
通過至少調用一次 tryAcquire(int) 來實現此方法,並在成功時返回。否則在成功之前,一直調用 tryAcquire(int) 將線程加入隊列,線程可能反復被堵塞或不被堵塞。可以使用此方法來實現 Lock.lock() 方法。
可知該函數是以獨占模式獲取對象而且忽略中斷,完畢synchronized語義。
在AQS類中的源代碼例如以下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
該函數主要完畢的邏輯例如以下:
1)首先調用tryAcquire(arg)函數嘗試獲取;
嘗試更改狀態state的值,而且保證原子性。
在tryAcquire方法中使用了同步器提供的對state操作的方法。利用compareAndSet保證僅僅有一個線程可以對狀態進行成功改動,而沒有成功改動的線程將進入sync隊列排隊。
值得註意的是這個函數在AQS中並沒有實現,而是在其繼承子類中實現(比方在ReentrantLock類中的內部類中NonfairSync和FairSync中均實現了這種方法)。
當獲取成功時,就會返回true,這時源代碼中的if語句就會直接執行if(0),也就是不滿足執行條件。
2)假設獲取不到,將當前線程構造成節點Node並加入sync隊列。
進入隊列的每一個線程都是一個節點Node,從而形成了一個雙向隊列,相似CLH隊列,這樣做的目的是線程間的通信會被限制在較小規模(也就是兩個節點左右)。
3)再次嘗試獲取。假設沒有獲取到那麽將當前線程從線程調度器上摘下。進入等待狀態。
使用LockSupport將當前線程unpark,關於LockSupport興許會具體介紹。
看看addWaiter()方法的邏輯:
凝視解釋的是:通過給定的模式和當前線程創建同步隊列結點。
源代碼例如以下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 高速嘗試在尾部加入
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//
enq(node);
return node;
}
//Inserts node into queue, initializing if necessary. See picture above.
private Node enq(final Node node) {
//死循環直至return
for (;;) {
Node t = tail;
//必須初始化的步驟
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
上面的邏輯解釋:
1)使用當前線程構造Node;
對於一個新創建的節點須要做的是:將新節點前驅節點指向尾節點(current.prev = tail)。尾節點指向它(tail = current),原有的尾節點的後繼節點指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節點的設置來保證的,也就是compareAndSetTail來完畢的。
2)先行嘗試在隊尾加入;
假設尾節點已經有了。然後做例如以下操作:
(1)分配引用pred指向尾節點。
(2)調用compareAndSetTail(pred, node)將新節點更新為尾節點;
(3)直接return,返回新插入的結點。
3)假設隊尾加入失敗或者是第一個入隊的節點。
假設是第1個節點,也就是sync隊列沒有初始化,那麽會進入到enq這種方法,進入的線程可能有多個,或者說在addWaiter中沒有成功入隊的線程都將進入enq這種方法。
enq(node)函數的邏輯是確保進入的Node都會有機會順序的加入到sync隊列中,而加入的過程例如以下:
(1)假設尾節點為空,那麽原子化的分配一個頭節點。並將尾節點指向頭節點,這一步是初始化;
(2)然後是反復在addWaiter中做的工作,可是在一個for (;;)的循環中,直到當前節點入隊為止。
至此。addWaiter()方法的邏輯分析完畢。接下來就是分析(final Node node, int arg) 方法的邏輯。
進入sync隊列之後。接下來就是要進行鎖的獲取,或者說是訪問控制了。僅僅有一個線程可以在同一時刻繼續的執行。而其它的進入等待狀態。而每一個線程都是一個獨立的個體,它們自省的觀察。當條件滿足的時候(自己的前驅是頭結點而且原子性的獲取了狀態)。那麽這個線程可以繼續執行。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取前驅結點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
上述邏輯主要包括:
1. 獲取當前節點的前驅節點;
須要獲取當前節點的前驅節點。而頭結點所相應的含義是當前站有鎖且正在執行。
2. 當前驅節點是頭結點而且可以獲取狀態,代表該當前節點占有鎖;
假設滿足上述條件,那麽代表可以占有鎖。根據節點對鎖占有的含義,設置頭結點為當前節點。
3. 否則進入等待狀態。
假設沒有輪到當前節點執行。那麽將當前線程從線程調度器上摘下。也就是進入等待狀態。
這裏針對acquire做一下總結:
1. 狀態的維護;
須要在鎖定時。須要維護一個狀態(int類型)。而對狀態的操作是原子和非堵塞的,通過同步器提供的對狀態訪問的方法對狀態進行操縱,而且利用compareAndSet來確保原子性的改動。
2. 狀態的獲取;
一旦成功的改動了狀態,當前線程或者說節點,就被設置為頭節點。
3. sync隊列的維護。
在獲取資源未果的過程中條件不符合的情況下(不該自己,前驅節點不是頭節點或者沒有獲取到資源)進入睡眠狀態,停止線程調度器對當前節點線程的調度。
這時引入的一個釋放的問題,也就是說使睡眠中的Node或者說線程獲得通知的關鍵,就是前驅節點的通知,而這一個過程就是釋放。釋放會通知它的後繼節點從睡眠中返回準備執行。
下面的流程圖基本描寫敘述了一次acquire所須要經歷的過程:
如上圖所看到的,當中的判定退出隊列的條件,判定條件是否滿足和休眠當前線程就是完畢了自旋spin的過程。
2)release(int arg)
首先看看Javadoc的定義:
以獨占模式釋放對象。假設 tryRelease(int) 返回 true。則通過消除一個或多個線程的堵塞來實現此方法。可以使用此方法來實現 Lock.unlock() 方法
源代碼例如以下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在unlock方法的實現中。使用了同步器的release方法。
相對於在之前的acquire方法中可以得出調用acquire,保證可以獲取到鎖(成功獲取狀態),而release則表示將狀態設置回去,也就是將資源釋放,或者說將鎖釋放。
上述邏輯主要包括:
1)嘗試釋放狀態。
tryRelease()函數可以保證原子化的將狀態設置回去。當然須要使用compareAndSet來保證。假設釋放狀態成功過之後。將會進入後繼節點的喚醒過程。
2. 喚醒當前節點的後繼節點所包括的線程。
通過LockSupport的unpark方法將休眠中的線程喚醒,讓其繼續acquire狀態。
private void unparkSuccessor(Node node) {
// 將狀態設置為同步狀態
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 獲取當前節點的後繼節點,假設滿足狀態,那麽進行喚醒操作
// 假設沒有滿足狀態,從尾部開始找尋符合要求的節點並將其喚醒
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
上述邏輯主要包括,該方法取出了當前節點的next引用,然後對其線程(Node)進行了喚醒。這時就僅僅有一個或合理個數的線程被喚醒。被喚醒的線程繼續進行對資源的獲取與爭奪。
回想整個資源的獲取和釋放過程:
1)在獲取時,維護了一個sync隊列。每一個節點都是一個線程在進行自旋,而根據就是自己是否是首節點的後繼而且可以獲取資源;
2)在釋放時。僅僅須要將資源還回去。然後通知一下後繼節點並將其喚醒。
這裏須要註意,隊列的維護(首節點的更換)是依靠消費者(獲取時)來完畢的,也就是說在滿足了自旋退出的條件時的一刻。這個節點就會被設置成為首節點。
至此AQS基本的兩個函數分析完畢。這兩個函數也是lock()函數和unlock()函數的核心。
J.U.C--locks--AQS分析