學習JUC原始碼(1)——AQS同步佇列(原始碼分析結合圖文理解)
前言
最近結合書籍《Java併發程式設計藝術》一直在看AQS的原始碼,發現AQS核心就是:利用內建的FIFO雙向佇列結構來實現執行緒排隊獲取int變數的同步狀態,以此奠定了很多併發包中大部分實現基礎,比如ReentranLock等。今天又是週末,便來總結下最近看的消化後的內容。
主要參考資料《Java併發程式設計藝術》(有需要的小夥伴可以找我,我這裡只有電子PDF)結合ReentranLock、AQS等原始碼。
博文中的流程圖,結構圖等都是我理解之後一步步親自畫的,如果轉載,請標明謝謝!
一、同步佇列的結構與實現
1、同步佇列的結構
(1)結構介紹
AQS使用的同步佇列是基於一種CLH鎖演算法來實現
CLH鎖也是一種基於連結串列的可擴充套件、高效能、公平的自旋鎖,申請執行緒只在本地變數上自旋,它不斷輪詢前驅的狀態,如果發現前驅釋放了鎖就結束自旋;
結點之間是通過隱形的連結串列相連,之所以叫隱形的連結串列是由於這些結點之間沒有明顯的next指標,而是通過myPred所指向的結點的變化情況來影響myNode的行為;
當一個執行緒須要獲取鎖時,會建立一個新的QNode。將當中的locked設定為true表示須要獲取鎖。然後執行緒對tail域呼叫getAndSet方法,使自己成為佇列的尾部。同一時候獲取一個指向其前趨的引用myPred,然後該執行緒就在前趨結點的locked欄位上旋轉。直到前趨結點釋放鎖。
當一個執行緒須要釋放鎖時,將當前結點的locked域設定為false,同一時候回收前趨結點。執行緒A須要獲取鎖。其myNode域為true。些時tail指向執行緒A的結點,然後執行緒B也增加到執行緒A後面。tail指向執行緒B的結點。然後執行緒A和B都在它的myPred域上旋轉,一旦它的myPred結點的locked欄位變為false,它就能夠獲取鎖。
而在原始碼中也有這樣的介紹:
/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks.
* ...........
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
* ..............
在AQS中的同步佇列結構以及獲取/釋放鎖都是基於此實現的,這裡我們先放一個我畫的基本結構來理解AQS同步佇列,再進一步介紹一些細節。
根據以上圖我們看到:
- 該佇列是雙向FIFO佇列,每個節點都有pre、next域;
- 同步器包含了兩個節點型別的引用,一個指向頭結點,一個指向尾節點;
- 新加入執行緒被構造成Node通過呼叫compareAndSetTail加入同步佇列中;
- 使用setHead(Node node)設定頭結點,指向佇列頭。使用compareAndSetTail(Node exceptNode, Node updateNode)指向佇列尾節點。
在原始碼中我們可以看到:
// 內部類Node節點 static final class Node{...} // 同步佇列的head引用 private transient volatile Node head; // 同步佇列的tail引用 private transient volatile Node tail;
(2)節點構成
那麼Node結構的具體構成是什麼呢?我們具體看內部類Node的原始碼:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** 等待狀態: * 0 INITAIL: 初始狀態 * 1 CANCELLED: 由於等待超時或者被中斷,需要從同步佇列中取消等待,節點進入該狀態不會被改變 * -1 SIGNAL: 當前節點釋放同步狀態或被取消,則等待狀態的後繼節點被通知 * -2 CONDITION: 節點在等待佇列中,執行緒在Condition上,需要其它執行緒呼叫Condition的signal()方法才能從等待隊轉移到同步佇列 * -3 PROPAGATE: 表示下一個共享式同步狀態將會無條件被傳播下去 */ volatile int waitStatus; /** 前驅結點 */ volatile Node prev; /** 後繼節點 */ volatile Node next; /** 獲取同步狀態的執行緒 */ volatile Thread thread; /** 等待佇列中的後繼節點 */ Node nextWaiter; /** 判斷Node是否是共享模式 */ final boolean isShared() { return nextWaiter == SHARED; } /** 返回前驅結點 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
從原始碼中可以發現:同步佇列中的節點Node用來儲存獲取同步狀態失敗的執行緒引用、等待狀態以及前驅和後繼節點。
節點是構成同步佇列的基礎,沒有成功獲取同步狀態的執行緒將成為節點加入該佇列的尾部。當一個執行緒無法獲取同步狀態時,會被構造成節點並加入同步佇列中,通過CAS保證設定尾節點這一步是執行緒安全的,此時才能認為當前節點(執行緒)成功加入同步佇列與尾節點建立聯絡。具體的實現邏輯請看下面介紹!
2、同步狀態獲取與釋放
(1)獨佔式同步狀態獲取與釋放
通過呼叫同步器acquire(int arg)方法可以獲取同步狀態,該方法中斷不敏感,也就是由於執行緒獲取同步狀態失敗後進入同步佇列中,後序執行緒對進行中斷操作時,執行緒不會從同步佇列中移出
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
同步狀態獲取主要的流程步驟:
1)首先呼叫自定義同步器實現tryAcquire(int arg)方法,該方法保證執行緒安全的獲取同步狀態。
2)如果獲取失敗則構造同步節點(獨佔式Node.EXCLUSIVE)並通過addWaiter(Node ndoe)方法將該節點加入到同步佇列的尾部,同時enq(node)通過for(;;)迴圈保證安全設定尾節點。
private Node addWaiter(Node mode) { // 根據給定模式構造Node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 嘗試在尾部新增 if (pred != null) { node.prev = pred; // cas方式保證正確新增尾節點 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // enq主要是通過for(;;)死迴圈來確保節點正確新增 // 在for(;;)死迴圈中,通過cas將節點設定為尾節點時,才返回;否則一直嘗試設定 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize 當tail節點為null時,必須初始化構造好 head節點 if (compareAndSetHead(new Node())) tail = head; } else { // 否則就通過cas開始新增尾節點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
假設原佇列中存在Node-1到Node-4節點,此時某個執行緒獲取同步狀態失敗則構成成Node-5通過CAS方式加入佇列(下圖忽略自旋環節)。
3)節點進入同步佇列之後“自旋”,即acquireQueued(final Node node, int arg)方法,在這個方法中,當前node死迴圈嘗試獲取鎖狀態,但是隻有node的前驅結點是Head才能嘗試獲取同步狀態,獲取成功之後立即設定當前節點為Head,併成功返回。否則就會一直自旋。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 當前node節點的前驅是Head時(p == head),才能有資格去嘗試獲取同步狀態(tryAcquire(arg)) // 這是因為當前節點的前驅結點獲得同步狀態,才能喚醒後繼節點,即當前節點 if (p == head && tryAcquire(arg)) { // 以上條件滿足之後 setHead(node); // 設定當前節點為Head p.next = null; // help GC // 釋放ndoe的前驅節點 failed = false; return interrupted; } // 執行緒被中斷或者前驅結點被釋放,則繼續進入檢查:p == head && tryAcquire(arg if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
此時新加入的Node-5節點也開始自旋,此時的Head(Node-1)已經獲取到了同步狀態,而Node-2退出了自旋,成為了新的Head。
文字總結:
1)同步器會維護一個雙向FIFO佇列,獲取同步失敗的執行緒將會被構造成Node加入隊尾(並且做自旋檢查:檢查前驅結點是否是Head);
2)當前執行緒想要獲得同步狀態,前提是其前驅結點是頭結點,並且獲得了同步狀態;
3)當Head呼叫release(int arg)釋放鎖的同時會喚醒後繼節點(即當前節點),後繼節點結束自旋
流程圖總結:
同步器的release方法:釋放鎖的同時,喚醒後繼節點(進而時後繼節點重新獲取同步狀態)
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 該方法會喚醒Head節點的後繼節點,使其重試嘗試獲取同步狀態 unparkSuccessor(h); return true; } return false; }
UnparkSuccessor(Node node)方法使用LookSupport(LockSupport.unpark)喚醒處於等待狀態的執行緒(之後會慢慢看原始碼介紹)。
(2)共享式同步狀態獲取與釋放
共享鎖跟獨佔式鎖最大的不同就是:某一時刻有多個執行緒同時獲取到同步狀態,獲取判斷是否獲取同步狀態成功的關鍵,獲取到的同步狀態要大於等於0。而其他步驟基本都是一致的,還是從原始碼開始分析起:帶字尾Share都為共享式同步方法。
1)acquireShared(int arg)獲取同步狀態:如果獲取失敗則加入隊尾,並且檢查是否具備退出自旋的條件(前驅結點是頭結點並且能成功獲取同步狀態)
public final void acquireShared(int arg) { // tryAcquireShared 獲取同步狀態,大於0才是獲取狀態成功,否則就是失敗 if (tryAcquireShared(arg) < 0) // 獲取狀態失敗則構造共享Node,加入佇列; // 並且檢查是否具備退出自旋的條件:即preNode為head,並且能獲取到同步狀態 doAcquireShared(arg); }
2)doAcquireShared(arg):獲取失敗的Node加入佇列,如果當前節點的前驅結點是頭結點的話,嘗試獲取同步狀態,如果大於等於0則在for(;;)中退出(退出自旋)。
private void doAcquireShared(int arg) { // 構造共享模式的Node final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); // 前驅節點是頭結點,並且能獲取狀態成功,則return返回,退出死迴圈(自旋) if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
3)releaseShared(int arg):釋放同步狀態,通過loop+CAS方式釋放多個執行緒的同步狀態。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 通過loop+CAS方式釋放多個執行緒的同步狀態 doReleaseShared(); return true; } return false; }
二、自定義同步元件(實現Lock,內部類Sync繼承AQS)
1、實現一個不可重入的互斥鎖Mutex
2、實現指定共享數量的共享鎖MyShareLock
--------------------------------未完待續(為了加深理解畫圖寫程式碼花費時間較長,所以慢慢來保證質量,不著急!)-------------------------------------------