1. 程式人生 > >AQS原始碼詳細解讀

AQS原始碼詳細解讀

# AQS原始碼詳細解讀 [TOC] ## 基礎 在講解AQS之前,有幾個額外的知識需要了解。知道了這些,才能明白AQS框架中很多程式碼的道理。 + CAS相關知識 + 通過標識位進行執行緒掛起的併發程式設計正規化 + MPSC佇列的實現技巧 歡迎加入技術交流群186233599討論交流,也歡迎關注筆者公眾號:風火說。 ### CAS相關知識 CAS相關知識具體不表,請百度相關概念 ### 通過標識位進行執行緒掛起的併發程式設計正規化 一個執行緒通過一個標識位來表明自己進入掛起狀態,那麼在該執行緒將掛起標識位設定為真時,需要再次檢查所有的資源條件,而後才能真正的將自己掛起。也就是所謂的二次檢查。因為別的執行緒需要通過掛起標識位來判斷是否喚醒掛起執行緒。而如果別的執行緒均讀取到false條件的掛起標識位值,那麼該準備進入掛起狀態的執行緒將不會有執行緒可以喚醒,所以必須執行二次檢查,來防止自己進入一個不會被喚醒的錯誤狀態。 用程式碼來表達上面的觀點就是 ```java //檢查周邊資源,確認執行緒需要進入掛起狀態 while(checkNeedPark()){ //設定掛起標識位為真 needPark = true; //再次檢查 if(checkPark()){ //已經設定好標識位,並且確實需要掛起了。進入掛起狀態。別的執行緒在檢測到掛起標識位時就可以嘗試喚醒 parkThread(); } else{ //清除掛起標識位,再次嘗試。 needPark=false; } } ``` ### MPSC佇列的實現技巧 如果一個佇列有多個生產者併發插入節點,但只有一個節點會取出節點。則稱該佇列為MPSC佇列。該佇列的入隊思路虛擬碼如下 ```java public void inser(Node insertionNode) { //獲取尾節點的物件 Node pre = tailOfQueue; //通過CAS的方式更換尾節點為插入節點。 cas(tailOfQueue,pre,insertionNode); //設定之前的尾節點的next指標為插入節點 pre.next = insertionNode; } ``` 通過這種入隊操作,所有的節點可以不需要加鎖的併發入隊。但是需要注意一點,一個節點的next指標為null,並不意味著它沒有後續節點,也有可能是後續節點還未執行` pre.next = insertionNode `操作。因此判斷是否有後續節點的方式就是判斷當前節點是否是尾節點,不是的話,則說明仍然存在後續節點。 ## 程式碼講解 AQS分為獨佔模式和共享模式。 ### 獨佔模式 先來看獨佔模式。所謂獨佔模式,指的是資源本身,在具體時刻內,最多隻能被一個執行緒持有。 #### 獨佔模式下請求資源 首先是一個獨佔的資源請求 ```java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` 其中`tryAcquire`方法是子類需要去實現。它的實現內容主要是獨佔式的搶佔對應的資源。因為是獨佔式搶佔,所以在搶佔前需要判斷當前的資源是否符合搶佔的要求。如果搶佔失敗,則嘗試將當前的執行緒放入一個等待佇列。 這裡面需要先解釋下AQS內部的這個佇列。這個佇列本質上也是一個MPSC佇列。但是與單純的MPSC佇列不同的是每一個節點都有一個pre指標,指向它的前繼節點。這樣,佇列便具備了從尾節點開始的向前回溯能力。 下面來看`addWaiter`方法的程式碼 ```java private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 首先是按照MPSC入隊演算法進行第一輪嘗試,如果嘗試成功就不需要走路到下面的迴圈流程。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 首先是初始化head和tail。使用了cas是因為這一步存在併發可能。 if (compareAndSetHead(new Node())) tail = head; } else { //重複迴圈,執行上面的入隊演算法,直到成功。 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } ``` 在入隊完畢之後,就進入等待,和不斷被喚醒嘗試搶佔的過程。也就是`acquireQueued`的程式碼表達。 ```java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //如果一個節點的前置節點是head節點,則意味著該節點可以進行嘗試搶佔資源。 if (p == head && tryAcquire(arg)) { //搶佔資源成功的情況下,可以設定自身節點為head節點。這裡不存在併發,所以直接設定。 // 從程式碼的全域性來看,佇列中的head節點代表著是當前持有資源的*某一個執行緒*。這裡的某一個是因為在共享模式中,head可能意味著多個持有資源執行緒中的任意一個。而在獨佔模式中,head節點指代的是持有資源的執行緒。這個head節點未必就是當前持有資源執行緒構建的,也有可能是後面的搶佔資源失敗的執行緒在入隊之後構建。無論如何,在獨佔模式中,head節點均代表當前持有資源的執行緒。無論head節點中的thread是否有值。 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //由於只有本節點的前置節點為head時才能呼叫該方法,該方法沒有併發,直接設定既可 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } ``` 既然是涉及到了執行緒掛起,那麼必然存在一個標識位。AQS選擇將該標識位放在前置節點中。這個標誌位是一個int引數。值為-1(Node.SINGLE)的時候,意味著後續節點處於執行緒掛起狀態。 >關於為何AQS選擇將標識位放在前置節點而非自身節點中。沒找到像Doug lea求證的方式,也就沒有辦法完全知曉。猜測可能是由於後繼節點的不穩定性(後繼節點可能會被取消,以及取消後會自行刪除等。),或者是在釋放的時候讀取node自身的資訊會更加方便的原因。從而AQS選擇將掛起標識位放在了前置節點 下面來看執行掛起執行緒的方法 ```java //這個方法就體現了併發程式設計中掛起執行緒的正規化思想。具體請參照上文。 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //如果前置節點的狀態已經是signal,意味著已經經歷過二次嘗試,可以返回true,讓執行緒進入等待狀態。 return true; if (ws > 0) { //如果前置節點處於取消狀態,則不斷的向前回溯,直到找到一個不是取消狀態的節點。無論如何,至少head節點不會是取消狀態,所以最終一定可以找到一個不是取消狀態的前置節點。然後將該node的pre指標指向該非取消狀態節點。在這個迴圈中就將AQS中的內部佇列的長度縮短了。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //將前置節點的狀態變更為signal。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //返回false,提示呼叫者執行緒仍然需要繼續嘗試,不可以進入休眠狀態 return false; } //讓執行緒進入等待狀態,並且返回執行緒的中斷資訊 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } ``` 正常情況下的流程走到上面也就結束了,而如果出現了問題,導致最終失敗。則進入節點的取消流程。 ```java private void cancelAcquire(Node node) { if (node == null) return; //首先設定執行緒為null,避免其他地方導致的喚醒。主要是防止浪費 node.thread = null; //整理內部佇列,從該節點開始向前回溯,找到一個不是取消狀態的節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; //獲得了新的前置節點的next值。 Node predNext = pred.next; //設定節點狀態為取消。一旦節點進入該狀態則無法變更。而其他的節點變更用的都是cas。所以這裡不需要cas,直接寫入即可。寫入後便不會更改。 node.waitStatus = Node.CANCELLED; //如果自身節點是尾節點,則嘗試將之前發現的前置節點設定為尾節點 if (node == tail && compareAndSetTail(node, pred)) { //如果設定成功,則意味著當前節點之後沒有後繼節點。沒有後繼節點就不需要喚醒 //對前置節點的next值進行cas更替。這裡要採用cas的原因是為了避免對正常的入隊流程造成錯誤影響。 //假設取消執行緒走到這裡,有其他執行緒在入隊,入隊完畢後必然要設定pred節點的next值。無論是否在該cas之前,其他執行緒的入隊操作最終都應該成功。而這裡採用了cas,是為了避免其他執行緒在入隊操作成功後,正確的next值被這裡錯誤的next值覆蓋。因為這裡的predNext是舊值,所以只應該在next仍然是prexNext的時候才能設定為null compareAndSetNext(pred, predNext, null); } else { //如果該節點不是尾節點,或者設定更新尾節點失敗,都意味著該節點還有後繼節點。有後繼結點就應該嘗試執行喚醒操作 //但是如果所有的取消都喚醒後繼結點則意味著不必要的浪費。因此有必要識別出什麼情況下是不需要喚醒的。下面的if判斷就是在識別,什麼情況下,無需喚醒後繼結點。 int ws; //如果前置節點是head節點,那麼就需要幫忙喚醒後繼節點。因為可能前置節點已經釋放了資源,並且喚醒後繼節點也就是該節點。然而前置節點執行喚醒方法時本節點已經進入取消流程。所以這裡要喚醒本節點的後置節點,否則就沒有執行緒去喚醒了。也就意味著喚醒傳播被中斷。造成死鎖。 //如果前置節點在cas嘗試的過程中進入了cancel狀態,那就主動進行喚醒。這種情況也是必須執行喚醒的。假設前置節點進入cancel狀態,假設前置節點喚醒判定時仍然選擇本節點進行喚醒(這是最糟糕的情況,會導致喚醒傳播中斷,如果此時本節點不喚醒後繼結點),而本節點在cancel程序中,自然也不會響應。那如果本節點不喚醒後繼結點,則意味著喚醒傳播整體中斷。 //總結來說,只有在前置節點不是head節點的情況下並且前置節點的狀態成功更新為signal才可以說不要喚醒,單純設定前置節點的next值。因為能正確設定signal就意味著還會有後續流程,喚醒傳播不會斷絕 if (pred != head && ( (ws = pred.waitStatus) == Node.SIGNAL || ( ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) ) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0) //cas操作。因為在程式碼執行的過程中,競爭執行緒可能會走的更快,pred的後繼結點是哪一個都會出現競爭,而如果出現尾節點的競爭,沒有cas就會出現錯誤。具體原因和上面描述想同。 compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } //一旦一個節點狀態進入canceled,則next值失去意義,所以這一步在最後完成。 node.next = node; // help GC } } ``` 喚醒後繼節點是重點,下面來看程式碼 ```java private void unparkSuccessor(Node node) { //通過cas,更換當前節點狀態為0.這樣主要的目的也是讓後繼節點在掛起錢繼續更多嘗試。所以這個cas是否失敗無所謂。但是要注意是需要原生狀態不為cancel的情況下,才能執行這個cas。因為cancel狀態進入後不能改變。 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //尋找合適的後繼節點。如果後繼結點為null或者狀態為取消,則從尾節點開始,不斷的向前回溯。直到找到一個非取消狀態,排位最靠前的節點。然後進行喚醒。從後向前回溯的原因是在該內部佇列內,只有pre指向是正確並且完整的。next指向為null時也許是入隊沒走完,甚至有些時候有可能是不打擾語義的錯誤資料。所以必須使用pre指向來進行回溯。 //喚醒後繼節點可能會遭遇併發,可能一個thread會被多次喚醒。不過這並不會影響到程式語義。所以沒有關係。 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } ``` #### 獨佔模式下的釋放資源 首先來看下釋放的整體程式碼 ```java public final boolean release(int arg) { //子類需要實現的獨佔式釋放鎖 if (tryRelease(arg)) { //獨佔模式中,持有資源的執行緒對應的必然是head節點 Node h = head; //如果head節點為null,那就意味著在該執行緒持有資源到釋放資源這段時間都沒有競爭,自然head節點為null,也就是沒有後繼結點 //如果head節點不為空,並且不是初始化狀態。就嘗試喚醒head節點的後繼節點。 //如果head節點的狀態是0,那就不需要喚醒,因為後繼節點會二次重試,不會陷入掛起。 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } ``` 獨佔式獲取,還有一種情況就是帶超時時間的獨佔獲取。其實基本思路都是一致,只不過線上程掛起的時候,不再是永久性的掛起,而是有超時時間的掛起。看下面的程式碼 ```java public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //進行嘗試獲取,失敗則嘗試嘗試超時獲取 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //主要就是在這句話上,這個api呼叫的執行緒掛起是存在超時時間的,時間過後執行緒會自動恢復。 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } ``` ### 共享模式 共享模式對於獨佔模式而言,最主要的區別就是同一時刻,持有資源的執行緒可以超過1個。 #### 請求資源 首先來看下共享模式下請求資源的程式碼 ```java public final void acquireShared(int arg) { //一個需要子類實現的共享獲取嘗試。這個方法要求子類實現一種共享模式下的資源請求。說白了,其實就是資源的總數大於1,因而可以同時存在多個執行緒持有該資源。方法返回負數意味著請求資源失敗,開始進入到入隊操作。 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } //這個方法大致上看起來和獨佔模式是很相像的。區別只在於獨佔模式下,在本方法中獲取到資源後,只是將本節點設定為head節點。而共享模式下,設定完head節點後,還需要做更多的事情。 private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); // tryAcquireShared按照jdk文件中的註解應該返回是資源的剩餘數。如果大於0,顯然應該讓後面等待的執行緒也參與獲取。而等於0的時候其實只是需要將本節點設定為head既可。不過那樣會讓程式碼變的麻煩。所以這裡只要返回結果非負,就執行設定head節點和傳播的語句。 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); //什麼情況下要喚醒後繼結點? //1.資源剩餘數大於0,有剩餘資源肯定是要喚醒後繼結點的 //2.頭結點不存在。可以看到這個條件會與後文的head!=null相沖突。而且實際上在這個方法執行的時候,head節點是必然存在的,不可能為null。留待下篇文章再做解讀。 //3.頭結點狀態小於0.這裡只有兩種可能,一種是SIGNAL(-1),一種PROPAGATE(-3)。這兩種數值的出現都意味著後繼節點要求node(也就是當前head)喚醒後繼結點 if ( propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //這裡的if判斷並非是一個無意義的防禦性程式設計。在AQS的實現類中,存在著所謂讀寫鎖。也就是說在AQS的內部佇列,共享節點和獨佔節點是都存在的。所以共享喚醒傳播到獨佔節點就要停止了。 if (s == null || s.isShared()) doReleaseShared(); } } ``` 下面來看下具體的共享喚醒實現。 ```java private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //由於該方法可能被併發呼叫,為了避免不必要的喚醒浪費,因為通過cas來搶佔喚醒權利。搶佔成功者執行真正的後繼結點喚醒任務。如果失敗則進行重試 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } //如果ws==0,可能是以為head結點已經沒有後繼結點,也有可能是因為head節點的後繼結點喚醒權被其他執行緒剛搶佔成功。 //如果沒有後繼結點,顯然不需要做任何事情 //如果是喚醒權被其他執行緒搶佔,則不需要做任何事情。因為喚醒是在佇列上進行傳播的。所以這裡就cas將head節點的狀態值修改為 PROPAGATE。用來表達該執行緒喚醒操作意圖已經傳達。但是會由別的執行緒真正的執行後續的喚醒動作。同樣,如果失敗了,則重試。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; //其實這裡缺少一個else if.用來對h.waitstatus== PROPAGATE做條件說明。如果發現節點狀態是 PROPAGATE,說明已經有其他執行緒嘗試搶奪喚醒權失敗,也就是已經有執行緒真正持有了喚醒權,那麼本執行緒就不需要做什麼了。 } //如果在上面的流程結束後發現head節點沒有變化,意味著本次修改過程佇列前端是無變化的。因此方法到這裡結束 //而如果head節點發生了改變,則需要重試上面的過程。這個重試是否是必要的?筆者因為沒有找到如果不使用這段程式碼會導致問題的場景,故而對這段程式碼的合理性表示存疑。 //目前來看,這個條件只是用來單純的簡單退出。使用這個條件比較簡單粗暴,就忽略細分場景下的錯誤情況。統一的,如果頭結點未變化就退出。會造成一定程度的喚醒浪費。 if (h == head) break; } } ``` #### 釋放資源 共享模式下釋放資源比較簡單。程式碼呼叫如下 ```java public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } ``` 程式碼邏輯比較簡單。相關涉及到的方法也都講解過,這裡不再二次展開了。 ### 條件變數 在AQS的內部類中,有一個內部類實現了`Condition`介面。這裡我們來看下具體的介面方法實現 #### await方法 這個方法用於在獲取了等待某一個條件滿足,在等待時會釋放掉當前持有的鎖。下面來看程式碼實現 ```java public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //建立一個建立一個條件節點,並且放入條件佇列 Node node = addConditionWaiter(); //將當前持有的鎖完全釋放 int savedState = fullyRelease(node); int interruptMode = 0; //如果當前節點不在AQS的內部佇列中,則保持等待狀態。除非執行緒中斷,或者是節點取消。 //節點實際上是會被signal方法從條件佇列移動到等待佇列。 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //進入鎖資源爭奪 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //爭奪到鎖資源後,幫忙清除條件佇列中已經取消的節點 if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; //如果發現尾節點已經處於取消狀態,則清理該條件佇列上的節點。 if (t != null && t.waitStatus != Node.CONDITION) { //清除條件佇列上所有狀態為cancel的節點。具體實現不復雜,這裡不展開說明 unlinkCancelledWaiters(); t = lastWaiter; } //為本執行緒建立一個節點,並且放到lastWaiter的next位置上,然後設定lastWaiter的值。因為這個方法在持有鎖的情況下執行,所以不需要擔心併發。 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } final int fullyRelease(Node node) { try { int savedState = getState(); if (release(savedState)) return savedState; } catch (RuntimeException ex) { node.waitStatus = Node.CANCELLED; throw ex; } //如果程式走入到這裡,說明上面的釋放返回了false。則意味著調用出現了異常。多半是因為沒有獲取持有鎖的情況下,就執行await操作導致。 node.waitStatus = Node.CANCELLED; throw new IllegalMonitorStateException(); } ``` #### signal方法 該方法用於喚醒在某個條件上等待的某一個執行緒 ```java public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //只有一種情況會失敗,該節點進入了cancel狀態。 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; //前置節點已經取消,則喚醒該節點執行緒執行一些同步動作。比如節點連線到合適的前置節點 //更換前置狀態為signal失敗,有可能是因為進入了取消狀態。此時要喚醒該節點的執行緒,讓其自行執行一些同步動作。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } ``` ## 總結 ### 為什麼需要SIGNAL狀態 在共享喚醒中,多執行緒併發爭奪喚醒權,必然存在一個cas的過程。也就是需要一個從有狀態值cas到0的過程。所以要存在這樣的一個狀態值,最後就是SIGNAL了。從另外一個角度來看,節點一旦進入取消狀態就不可恢復,因此需要存在一個不同的狀態用來表示該節點需要喚醒,這也就是signal。 ### 為什麼需要PROPAGATE狀態 在共享喚醒中,所有的節點都不斷的搶奪喚醒權是沒有意義而且浪費的。同時需要一個與初始狀態不同的狀態用來表達多執行緒競爭喚醒權的結果。因為從SIGNAL到0是表示喚醒權被某一個執行緒搶奪完成,因此需要有一個額外的狀態可以用來通知其他競爭執行緒可以停止競爭了。所以就有了 PROPAGATE