1. 程式人生 > 實用技巧 >AbstractQueuedSynchronizer(AQS)抽絲剝繭深入瞭解JUC框架原理

AbstractQueuedSynchronizer(AQS)抽絲剝繭深入瞭解JUC框架原理

目錄

簡介

AQS(AbstractQueuedSynchronizer)是併發開發中一個基礎元件。主要實現了同步狀態管理、執行緒佇列管理、執行緒等待、執行緒喚醒等底層操作。JDK中許多的併發類都是依賴AQS的。 ReentrantLock(可重入鎖)、Semaphore(訊號量)、CountDownLatch(計數器)。

Lock簡單實用

  • 介紹原理前我們簡單來看看Lock使用。

public static void main(String[] args) {
Integer index = 0;
ReentrantLock lock = new ReentrantLock();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
System.out.println(finalI);
lock.unlock();
}
});
threadList.add(thread);
}
for (Thread thread : threadList) {
thread.start();
}
}
  • 就是lock 和unlock的使用。就能夠保證中間的業務是有序執行的。上面不會保證輸出數字有序,但是能保證輸出的個數是100個,因為這裡我們理解成他們會進入佇列中。但是進入的順序不確定。那麼下面我們看看lock 、unlock 與我們今天的主角AQS有什麼關係。

主體框架

AQS提供了一個依賴FIFO(先進先出)等待佇列的阻塞鎖和同步器的框架。該類是一個抽象類。其中暴露出來的方法主要用來操作狀態和類別判斷。這些方法我們不需要考慮阻塞問題,因為在AQS中呼叫這些方法的地方會處理阻塞問題

方法 描述
boolean tryAcquire(int args) 嘗試獲取獨佔鎖
boolean tryRelease(int args) 嘗試釋放獨佔鎖
int tryAcquireShared(int args) 嘗試獲取共享鎖
boolean tryReleaseShared(int args) 嘗試釋放共享鎖
boolean isHeldExclusively() 當前執行緒是否獲得了獨佔鎖

其他方法有AQS類實現。在AQS中實現的方法會呼叫到上面的抽象方法。正常子類是已內部類方式呈現的。這樣的好處可以做到封閉式的同步屬性。AQS內部實現的方法大概介紹

方法 描述
void acquire(int args) 獲取獨佔鎖,內部呼叫tryAcquire方法,
void acquireInterruptibly(int args) 響應中斷版本的acquire
boolean tryAcquireNanos(int args , long nanos) 響應中斷+超時版本的acquire
void acquireShared(int args) 獲取共享鎖,內部呼叫tryAcquireShared方法
void acquireSharedInterruptibly(int args) 響應中斷版本的獲取共享鎖
boolean tryAcquireSharedNonos(int args,long nanos) 響應中斷+超時獲取共享鎖
boolean release(int args) 釋放獨佔鎖
boolean releaseShared(int args) 釋放共享鎖
Collection getQueuedThreads() 獲取同步佇列上的執行緒集合

原理解析

AQS內部是通過一個雙向連結串列來管理鎖的(俗稱CLH佇列)。

當前程嘗試獲取鎖失敗時,會將當前執行緒包裝成AQS內部類Node物件加入到CLH佇列中,並將當前執行緒掛起。當有執行緒釋放自己的鎖時AQS會嘗試喚醒CLH佇列中head後的直接後繼的執行緒。AQS的status我們可以根據他來做成不同的需求。這個後續再說。下面我們已ReentrantLock來說明下AQS原理。

  • 上面標註的是ReentrantLock中的lock方法。這個方法表示去上鎖。瞭解Lock的都知道這個方法會一直阻塞住知道上鎖成功才會執行完。而ReentrantLock.lock方法實際上的sync物件去上鎖的。而sync在ReentrantLock中有公平鎖和非公平鎖兩種。

  • 在AQS中預設的是非公平鎖,即隨機喚醒執行緒。



  • 通過上面繼承關係我們發現了我們今天的主角-AbstractQueueSynchronizer 。

  • NonfairSync實現了兩個方法lock、tryAcquire方法。其中lock就是通過狀態位實現鎖機制的。0-未上鎖;1-已上鎖 。 lock的邏輯就是如果上鎖成功會將狀態置為1且設定獨佔模式的所屬執行緒為當前執行緒。否則呼叫acquire嘗試獲取鎖。

獨佔鎖

AQS資料結構

  • AQS裡面主要是狀態位的管理。下面我們看看包含的屬性

Class AbstractQueuedSynchronizer{
/*佇列中的頭結點,無實際意義,head的後繼節點才是佇列中的第一個節點*/
private transient volatile Node head;
/*佇列中的尾節點*/
private transient volatile Node tail;
/*佇列中的狀態,上鎖解鎖 可以擴充套件成不同的狀態 。 AQS實際上也是對該欄位的管理。子類中通過get set compare方法對state的管理*/
private volatile int state;
}

CLH資料結構

  • 上面我們瞭解到會將執行緒包裝成Node物件加入到雙向連結串列(CLH)中。下面我們看看Node的結構吧

static final class Node {
/*共享模式的標記*/
static final Node SHARED = new Node();
/*獨佔模式的標記*/
static final Node EXCLUSIVE = null;
/*佇列等待狀態-取消*/
static final int CANCELLED = 1;
/*佇列等待狀態-喚醒*/
static final int SIGNAL = -1;
/*佇列等待狀態-條件等待*/
static final int CONDITION = -2;
/*佇列等待狀態-廣播*/
static final int PROPAGATE = -3;
/*佇列等待狀態,取值範圍就是上面的等待狀態之一*/
volatile int waitStatus;
/*前驅節點*/
volatile Node prev;
/*後繼節點*/
volatile Node next;
/*節點對應的執行緒:繫結關係*/
volatile Thread thread;
/*TODO*/
Node nextWaiter;
/*判定是否是共享模式*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/*獲取當前節點的前驅節點,如果沒有前驅節點丟擲NullPointerException*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/*用於建立雙向連結串列中的Head節點,其實Head節點就是一個標誌並不會與執行緒掛鉤。相當於一個佇列的預設頭節點。或者用來建立共享模式的節點。因為共享模式的節點就是無參構造*/
Node() {
}
/*將執行緒包裝成Node物件加入佇列中,原始碼中是用來新增Thread至佇列*/
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
/*常用語加入條件狀態佇列中TODO*/
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

acquire實現步驟

  • 上面我們瞭解到Lock中實現lock的底層是AQS的acquire實現的。

  • 通過檢視原始碼我們大概能瞭解到其上鎖的流程,

    • 首先嚐試獲取鎖
    • 獲取鎖失敗後,將當前執行緒包裝成Node物件新增到CLH佇列中
    • 自行阻塞當前執行緒,等待佇列喚醒自己

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

addWaiter

/**
* 通過Node物件的建構函式構造Node物件新增到CLH佇列中
* 這個方法主要是雙向連結串列的操作。C++的同學應該會很容易理解
*/
private Node addWaiter(Node mode) {
/*當前執行緒加入佇列後此時是沒有後繼節點的,且已獨佔模式訪問的
*所以這裡加入的Node在上一不傳入的是Node.EXCLUSIVE,這裡就表示
*是已獨佔模式進行上鎖從而進行加入佇列的
*/
Node node = new Node(Thread.currentThread(), mode);
/*獲取佇列中的最後一個Node節點;這裡是進行快速插入測試。
*預設佇列已經在堆積Node節點了這個時候直接將節點追加到tail裡。
*其實這裡和enq()方法是一樣的邏輯。只不過enq裡面會進行等待佇列
*正常才會加入
*/
Node pred = tail;
if (pred != null) {
/*佇列已經產生執行緒等待就會將當前node節點的前驅節點只為tail
*的複製節點
*/
node.prev = pred;
/*基於CAS(內部UnSafe實現)設定尾部為node節點*/
if (compareAndSetTail(pred, node)) {
/*原本的tail節點的後繼節點自然就是node節點*/
pred.next = node;
/*到這裡node節點就已經加入了CLH佇列中*/
return node;
}
}
/*邏輯同上,不在贅述*/
enq(node);
return node;
}

acquireQueued

  • 這裡傳的Node是我們上一步剛剛新增到隊尾的節點。為什麼不直接用tail節點呢?我們仔細觀察發現tail的修飾

private transient volatile Node tail;
  • 我們知道volatile是記憶體可見的。什麼叫記憶體可見。我們的屬性變數是儲存在記憶體中的。每次有執行緒啟動訪問這個類的時候都會複製記憶體中屬性值到自己執行緒中。所以在多執行緒情況下修改了這個屬性就會出現問題因為A執行緒修改了值但是B執行緒並無法感知還是以原先的值進行互動。這就是典型的多執行緒帶來的問題。而volatile做到了的執行緒感知。當A執行緒修改了tail後立馬B執行緒就感知到了。但是這並不能徹底的解決多併發的問題。這裡我們簡單介紹下這個關鍵字
  • 經過上面簡單闡述高併發場景,所以這裡不能直接用tail。因為這個時候tail很有可能已經不是我們的tail的。這裡直接傳遞Node節點是非常明智的選擇。而且是final修飾的。更加保證了使我們上一步驟新增到隊尾的那個節點

/**
* 再次嘗試獲取鎖,對中斷不敏感。
*/
final boolean acquireQueued(final Node node, int arg) {
/*失敗標誌位*/
boolean failed = true;
try {
/*執行緒是否被打斷標誌位*/
boolean interrupted = false;
/**/
for (;;) {
/*獲取當前想成包裝的Node節點的前驅節點*/
final Node p = node.predecessor();
/*如果前驅節點是head節點表示當前節點在隊首可以嘗試
*獲取下鎖,這裡為什麼是嘗試獲取呢因為這個時候可能鎖
*還被其他執行緒佔著。這裡嘗試獲取純粹就是試試機會
*/
if (p == head && tryAcquire(arg)) {
/*成功獲取到鎖,說明我們試一試的心態成功了。
*人生也一樣,總得試一試萬一成功了呢。看原始碼還
*能學到人生道理呢。劃重點
*/
/*這個時候在tryAcquire中已經被當前執行緒佔用了鎖了。
*我們這裡不需要擔心其他執行緒會搶佔,這個時候我們
*需要將當前執行緒從佇列中踢出,直接將當前執行緒置為
*head節點。setHead方法也很簡單,將node的前驅節
*點置為null,因為head是首位,首位之前不應該在
*有節點了,然後執行緒也被銷燬了
*/
setHead(node);
/*p節點是老的head節點這個時候已經不需要了。
*這裡jdk的操作是將next至為null, 這樣p節點
*就成為不可達狀態,接下來的命運就是等待被GC。
*這裡我們不是將p置為null的原因是我們p=null ,
*只是將p指向null, 但是原先的head的那個Node的
*地址任然通過Node進行指向,GC是無法回收的。好好理解下*/
p.next = null; // help GC
/*這裡我們已經獲取了。而且成功上了鎖。所以這裡就
* 無法取消獲取了,而且我們已經將Node剔除了,也
* 沒有必要再進行取消獲取操作了。所以在finnally中
* 就沒必要執行了*/
failed = false;
/*返回執行緒是否被中斷狀態*/
return interrupted;
}
/*如果當前執行緒對應的Node節點不是head的後繼節點或者
* 沒有獲取到鎖,這個時候我們開始阻塞執行緒*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
/*取消當前執行緒對應的Node節點在佇列中排隊。這裡可以
*理解成棄權操作。這裡取消會順便遍歷之前的節點如果
* 有棄權的這裡會一併操作掉
*/
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire


/**
* 在失敗獲取鎖的情況下判斷是否需要對執行緒進行阻塞並同意修改執行緒
* 在佇列中狀態。如果前驅節點是SIGNAL狀態那麼node節點就進入
* 準備狀態。前驅節點CANEL狀態需要剔除。如果是CONDITION或者
* PROGAGATE狀態,在ReentrantLock中我們暫時不考慮這兩者情況,
* 所以這裡就強制轉換為SIGNAL狀態
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/*獲取前驅節點的狀態*/
int ws = pred.waitStatus;
/*如果前驅節點是等待通知狀態,那麼當前節點需要等待前驅
* 結點被喚醒,所以這裡需要被阻塞
*/
if (ws == Node.SIGNAL)
return true;
/*如果前驅節點>0,即為canclled狀態*/
if (ws > 0) {
//這裡其實和cancelAcquire邏輯差不多,需要將取消的節點從佇列中剔除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*剩下的情況,統一將節點狀態更正為等待通知狀態*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt


/**
* 阻塞當前執行緒,等待被喚醒
*/
private final boolean parkAndCheckInterrupt() {
/*這裡就是阻塞執行緒,並等待LockSupport.unpark喚醒*/
LockSupport.park(this);
/*在park之後我們需要Thread.interrupted恢復下執行緒的中斷狀態,
* 這樣下一次park才會生效。否則下一次的park不會生效的
*/
return Thread.interrupted();
}

cancelAcquire


/**
* 將node節點之前(包括當前node)取消狀態的全部剔除
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
/*剔除操作需要解綁node和thread關係*/
node.thread = null;
/*獲取node的前驅節點*/
Node pred = node.prev;
/*大於0就是取消狀態*/
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
/*這裡直接置為取消狀態,是為了方便其他執行緒進行取消是的操作,
* 也是為了方便跳躍該節點
*/
node.waitStatus = Node.CANCELLED;
/*如果node是隊尾的haul,那麼將隊尾設定成node的前驅結點*/
if (node == tail && compareAndSetTail(node, pred)) {
/*將隊尾的pred節點的後繼節點置空,這是一個佇列的標準要求*/
compareAndSetNext(pred, predNext, null);
} else {
//如果是非隊尾節點
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
/*pred節點狀態如果是有效節點且不是head,將pred的後繼
* 節點指向node的後繼節點。這裡和C++指標指向是一個道理*/
Node next = node.next;
if (next != null && next.waitStatus <= 0)
/*node的後繼節點是有效節點且不是取消狀態,進行替換*/
compareAndSetNext(pred, predNext, next);
} else {
/*
* 這裡就是對上面提到的阻塞進行放行。裡面
* 實際上是LockSupport.unpark進行放行的。
* 這個時候我們通過上面的if知道,這個時候在以下幾種場景出現
* 1、pred==head
* 2、pred是取消狀態
* 3、pred.thread==null 即不是有效節點
* 以上這些情況都表示pred不是能進行喚醒的節點,我們
* 這裡理解為不是標準節點。這個時候為了保證佇列的活躍性,
* 我們需要喚醒後繼節點,實際上就是node的後繼節點。
*/
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
  • 在上面程式碼中當程式碼執行到unparkSuccessor(node)這一塊時就會去喚醒node節點。但是我們的canelAcquire方法是為了取消node節點之前取消狀態的節點的。這樣就會與我們功能違背。命名方法是為了剔除canel節點。現在確實去喚醒node節點。這裡我們上面shouldParkAfterFailedAcquire方法中在狀態>0時回去自動剔除這些節點的。這樣就實現了canelAcquire方法的功能了。所以我們不需要糾結。

    ps: 原始碼終究是原始碼,考慮的是非常全面的。

if (ws > 0) {
//這裡其實和cancelAcquire邏輯差不多,需要將取消的節點從佇列中剔除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}

unparkSuccessor


/**
* 喚醒node節點
*/
private void unparkSuccessor(Node node) {
/*獲取當前節點的狀態*/
int ws = node.waitStatus;
/*對狀態進行判斷*/
if (ws < 0)
/*若果小於0,則進行強制糾偏為0*/
compareAndSetWaitStatus(node, ws, 0);
/*獲取當前節點的後繼節點*/
Node s = node.next;
/*判斷*/
if (s == null || s.waitStatus > 0) {
/*後繼節點為有效節點且狀態>0 , 這裡即為CANCELLED狀態,
* 則將該節點在CLH中剔除,並進行斷層連線*/
s = null;
/*這裡和向前去除取消狀態的前驅節點一樣,只不過這裡是向後
*至於為什麼是從後向前呢,是為了避免高併發帶來的節點不一
* 致性。因為從node開始往後的話,很有可能後面會被其他
* 執行緒修改了。因為新增節點的往後新增的。所以從後往前的話這樣能保證資料一致。但是這樣就會導致其他執行緒新增的節點是無法訪問到的。這一點和資料一致性比較還是前者比較重要。此次獲取不到沒關係,在獲取鎖的時候jdk使用的是for迴圈。會不停的檢查佇列中節點是否可以被喚醒的。這裡我們理解是一個定時器。所以一次獲取不到節點沒關係。總有一次會被喚醒。
*/
for (Node t = tail; t != null && t != node; t = t.prev)
/*head節點狀態應該是0,所以這裡最後s就是head.所以後面釋放* 的就是head的後繼節點。*/
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
/*這裡對應的是parkAndCheckInterrupt中的
* LockSupport.lock(this)方法。unpark
* 之後parkAndCheckInterrupt方法就會執行到Thread.interrupted
* 並進行返回,這個時候回返回true*/
LockSupport.unpark(s.thread);
}

acquire

  • 到這裡acquire執行步驟我們按照方法維度一一進行閱讀了。我們大概梳理下就是第一步獲取鎖,獲取失敗就會加入佇列,這個時候該執行緒會被阻塞,在加入佇列的過程中會進行鍼對佇列進行無效節點去除(取消狀態或者引數null等情況)。保證佇列裡的node都是有效且活躍的節點。這個過程會保證佇列是運轉的。如果加入佇列順利的話下一步就是自行的中斷執行緒進行掛起Thread.currentThread().interrupt();,其實執行到這一步就表示這個執行緒已經不需要了。被取消了。後續會將這個執行緒作廢。

下面貼出一個來自於部落格園大神的原理圖

release

  • 獲取獨佔鎖的邏輯還是很複雜的,裡面涉及到操作雙向連結串列的操作,如果沒有接觸過C++應該還是很吃力的。其實在獲取的邏輯中已經牽涉了釋放的邏輯。在我們喚醒node的後繼節點其實也是釋放邏輯中的重頭戲。




public final boolean release(int arg) {
/*會呼叫tryRelease,這個方法是有子類實現的。我們在ReentrantLock
* 中應該是非公平鎖實現的tryRelease。這個方法後面會說。
* 這裡我們需要提一點的:當一個執行緒獲取到鎖時,它對應的Node是
* 不會再佇列中的。所以這裡釋放我們可以理解成喚醒Head的後繼節點。
* 這裡就和上面喚醒node的後繼節點一樣了。所以你會看到同樣的
* 方法*unparkSuccessor(h)
*/
if (tryRelease(arg)) {
/*獲取CLH佇列中的隊首節點*/
Node h = head;
if (h != null && h.waitStatus != 0)
/*喚醒head節點的後繼節點*/
unparkSuccessor(h);
return true;
}
return false;
}
  • 這裡需要解釋下為什麼會對head節點進行判斷。因為AQS中head預設的null。那麼head是什麼建立的呢。是在我們上面加鎖的時候加入,在加入佇列後需要進行前驅結點判斷的時候建立head的。這個時候的head沒有設定狀態。那麼這個狀態是預設0的。所以上面判斷只需要判空就行了。但是為了嚴謹JDK進行雙重判斷了。

private transient volatile Node head;
  • 所以這裡需要對head進行判空。

tryRelease

  • 其實在上面acquire步驟講解中,我們漏掉了tryAcquire方法的閱讀。目的是為了和tryRelease方法進行合併講解。因為這兩個方法都是交由子類實現的。放在一起講我們更加能理解設計 。 在ReentrantLock中tryAcquire是有非公平鎖的nonfairTryAcquire實現的

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
/*首先獲取獨佔鎖的state*/
int c = getState();
if (c == 0) {
/*c==0表示當前獨佔鎖沒有被任何執行緒佔用。這個時候是可以加鎖的*/
if (compareAndSetState(0, acquires)) {
/*設定當前擁有次所的執行緒為當前執行緒*/
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
/*因為這個判斷,實現了可重入式的鎖,這樣一個執行緒可以重複上鎖操作。*/
/*c!=0表示已有執行緒佔用。如果是當前執行緒的表示被重入了。那麼這個獨佔鎖state就會繼續累加。這裡的state是AQS的state和Node裡面waitStaus是兩回事。在這裡累加在釋放方法裡就是遞減。這樣對比我們就容易理解了。這裡的status不同的實現有著不同的定位功能*/
int nextc = c + acquires;
if (nextc < 0) // overflow
/*這裡的判斷著實沒有看懂。希望大神指點。*/
throw new Error("Maximum lock count exceeded");
/*CAS設定state*/
setState(nextc);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
/*看完tryAcquire中遞增的操作,我們就能理解這裡遞減的邏輯了*/
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
/*c==0表示這個執行緒因為可重入的上鎖方式,完全的釋放的獨佔鎖。這個時候才可以被別的執行緒佔用*/
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

共享鎖

  • 共享鎖的實現主要應用場景就是在讀場景。獨佔鎖應用場景就是寫場景。這個在ReentrantReadWriteLock類中使用了這兩種場景。上面獨佔鎖我們通過ReentrantLock閱讀了一遍。下面我們通過ReentrantReadWriteLock來體驗下共享鎖的邏輯吧。

  • 共享鎖邏輯有所變動。但是裡面涉及到的方法在獨佔鎖中都提到了。下面我們會提及下未提到的方法。公用的方法聰明的你應該是閱讀明白了。



  • 同樣tryAcquireShared方法這裡暫時不看。到後面和釋放方法一起閱讀。我們先來通過doAcquireShared方法為入口進行閱讀

獲取共享鎖

doAcquireShared


/**
* 這個方法仔細看其實和獨佔鎖acquire是一樣的邏輯。只不過方法全都提到方* 法內部了。
* addWaiter和獨佔鎖中是一個方法
* 後面的for迴圈也是一樣的,如果是head的後繼節點則會執嘗試獲取鎖,並替* 換head。並且如果執行緒阻塞過就會自行中斷執行緒等操作。所以看完獨佔鎖在* 學習共享鎖就很容易了。兩者雖有不同但是還是及其相似的
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
/*如果前驅節點是head節點就會去嘗試獲取鎖,有可能會成功*/
int r = tryAcquireShared(arg);
if (r >= 0) {
/*獲取成功就會將節點剔除,從而head節點指向最新節點*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

setHeadAndPropagate


/**
* progagate表示當前共享鎖的容量
* node 表示當前執行緒對應的Node
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 在方法外部已經確保了Progagate>=0
* progagate=0表示當前共享鎖已經無法被獲取了。所以這裡條件
* 之一 progagte>0
* 1、progagate>0 那麼就會檢視佇列中後繼節點是否符合條件,如果符* 合的 則通過doReleaseShared方法進行喚醒佇列中head的後繼節點
* 2、head==null 表示AQS還沒有建立head這個時候出發釋放的方法是為* 了讓釋放這個過程啟動。內部實現因為是for迴圈。相當於監聽head節點
* 3、head.waitStatus<0 表示在doReleaseShared被設定成
* Node.PROGAGATE屬性了。釋放鎖的時候會設定head的狀態從
* SIGNAL置為0,也會從0置為PROGAGATE。head節點預設的狀態也是0,
* 所以這裡的head狀態小於0只可能是被另外一個執行緒釋放資源是
* 執行了置為PROGAGATE的程式碼了。雖然progagate==0但是隻是
* 獲取那會是0在高併發場景下會被改變的。既然另外一個執行緒釋放
* 資源那麼這裡自然就可以去喚醒佇列執行緒去嘗試獲取。這裡條件判斷
* 我們後面整個邏輯講完會重新梳理下這個地方
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

doReleaseShared


private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
/*獲取head*/
int ws = h.waitStatus;
/*
*head節點狀態預設是0,所以在佇列中第一次應該是進入
*下面的if中並且設定head節點為傳播狀態;設定成傳播狀
*態的目的是為了方便對應上面我們方法中的
*判斷h.waitStatus < 0 。這樣就會去喚醒head節點
*的後繼節點了。這個時候可能會失敗但是共享就是讓他
*們儘可能的獲取。所以這裡設定傳播狀態。也有可能
*會經過shouldParkAfterFailedAcquire方法將傳播
*狀態糾偏為SIGNAL狀態,也就是後面會被糾正過來。這個
*時候需要和shouldParkAfterFailedAcquire對比,
*shouldParkAfterFailedAcquire是遇到SIGNAL狀態對
*後繼節點進行阻塞,而在這裡是遇到SIGNAL狀態就進行釋放
*/
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
/*與獨佔鎖一樣*/
unparkSuccessor(h);
}
/*這裡就是設定傳播狀態,與setHeadAndPropagate方法對應*/
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

釋放共享鎖

  • tryReleaseShared同理是交由子類實現的。後面我們通過ReentrantReadWriteLock類來看這兩個方法實現邏輯。最終AQS的釋放邏輯還是放在的doReleaseShared方法上。

doReleaseShared

  • 在上面閱讀獲取共享鎖時,設定head節點後會檢查後繼節點,判斷是否需要喚醒的時候就是doReleaseShared 。 所以這個方法這裡也不需要說了。

tryAcquireShared


/**
* 與讀鎖不衝突的前提下獲取寫鎖,有剩餘的前提下會一直獲取直至獲取成功,
* 獲取失敗返回 -1
* 獲取成功返回 1
*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
/*exclusiveCount就是c與獨佔鎖容量的一個與運算。共享容量2^16-1
*所以只要c!=0 exclusiveCount(c)就!=0,另一個條件時判斷是否
*是當前執行緒。這個也是可重入式鎖的憑證
*/
/*
* 讀鎖和寫鎖是互斥的,所以這裡如果其他執行緒已經獲取了寫鎖,那麼
* 讀鎖就沒法獲取了。
*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
/*sharedCount就是獲取共享鎖的容量*/
int r = sharedCount(c);
/*readerShouldBlock就是判斷是否需要對該節點進行阻塞,只要是有
*效節點且是共享節點就不阻塞;讀鎖寫鎖是一個32位表示的,高位寫
*鎖低位讀鎖,SHARED_UNIT是低16位,所以這裡就是增加讀鎖次數*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
/*表示第一次讀*/
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
/*第一次讀的執行緒重複讀,累計執行緒讀取次數*/
firstReaderHoldCount++;
} else {
/*實際上就是一個ThreadLocal管理讀的次數。和上面firstReader作用一樣。*/
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
/*高併發場景下CAS疊加次數不一定會成功,這個時候需要*fullTryAcquireShared再次獲取讀鎖,這個方法邏輯和上面可以說是
*一樣的。那麼為什麼他叫full ,因為裡面用了迴圈確保在有剩餘的條件
*下一隻獲取讀鎖。不會因為CAS的問題獲取不到*/
return fullTryAcquireShared(current);
}

tryReleaseShared


/**
* 這裡只要有讀鎖存在就會返回false ,這裡有個疑問,
* 如果返回false那麼AQS的release就無法去釋放佇列。這種情況
* 是因為佇列本身是活躍的。會按順序釋放鎖的。而讀鎖的釋放
* 其實在tryReleseShared裡就釋放了。讀鎖其實就是計數。
* 這裡會在ReentrantReadWriteLock章節詳細解說
*/
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// 當前執行緒是第一個獲取讀鎖的。這裡會加讀的次數一直遞減。
//當前執行緒全部釋放完了,就接觸當前執行緒的佔位
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
//這裡針對非第一個獲取讀鎖的執行緒進行釋放。顯示次數的釋放
//完全釋放後就丟棄執行緒
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//這裡和上面的fullTryAcquireShared對應。迴圈釋放一直到釋放成功為止
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

總結

AQS是jdk中併發類的一個底層原理。好多jdk的併發類都是基於此實現的。AQS其實就是一個框架。簡單總結幾句話

  • AQS是併發的一個基類
  • 內部維護了FIFO佇列
  • 擁有兩種模式: 獨佔模式(寫鎖)、共享模式(讀鎖)

內部state就是表示鎖的狀態。不同的實現可以有不同的定義。

ReentrantLock : 純粹鎖的狀態 +1、-1

Semaphore : 鎖的個數

CountDownLatch: 計數器,一個標誌位