1. 程式人生 > >AbstractQueuedSynchronizer(AQS)原始碼解析

AbstractQueuedSynchronizer(AQS)原始碼解析

      關於AQS的原始碼解析,本來是沒有打算特意寫一篇文章來介紹的。不過在寫本學期課程作業中,有一門寫了關於AQS的,而且也畫了一些相關的圖,所以直接拿過來分享一下,如有錯誤歡迎指正。
      然後基本簡介也都不介紹了,網上一大堆,這裡就直接進行原始碼的分析了。

AQS基本屬性

      
      AQS屬性簡介:

屬性 型別 詳解
Head Node型別 持有鎖的執行緒結點,也是佇列中的頭結點
Tail Node型別 阻塞佇列中的尾結點,同時每一個新的結點進來,都插入到阻塞佇列的最後。
State int型別 大於等於0。代表當前鎖的狀態。0代表沒有執行緒佔用當前鎖,大於0代表有執行緒持有鎖。
exclusiveOwnerThread(繼承自AOS) Thread型別 代表獨佔鎖的執行緒。

      AQS的具體結構如下圖所示:

      在AQS連結串列中,將每一個執行緒包裝成Node例項,並通過連結串列的形式連結儲存,在鏈式結構中,節點通過next和prev分別與前驅節點和後置節點相連線。其中head節點表示為當前持有鎖的執行緒,不在阻塞佇列中。tail節點為連結串列中最後一個節點,當有新的節點被新增到連結串列中後,AQS會將tail引用指向最後一個被新增進連結串列的節點。

AQS中Node內部類

      
      Node屬性簡介:

欄位 簡介 欄位 簡介
SHARE 標識節點當前在共享模式下 EXCLUSIVE 標識節點當前在獨佔模式下
CANCELLED 標識當前節點所表示的執行緒已取消搶鎖 SIGNAL 標識當前節點需要在釋放鎖後喚醒後繼節點
CONDITION 與ConditionObject內部類有關 waitStatue 取值為以上幾種狀態
prev 代表當前節點的前驅節點 next 代表當前節點的後繼節點
thread 代表當前節點所表示的執行緒

1 加鎖

      這裡以一個鎖的具體使用方法對AQS類進行詳細的分析:

      首先,執行緒先對鎖物件進行獲取操作,如果當前需要獲取的鎖物件並沒有其他執行緒所持有,成功獲取到了鎖,將執行相關的業務程式碼,執行完畢後,對鎖資源進行釋放,以便其他執行緒所使用。如果當前執行緒獲取鎖資源失敗,說明鎖資源有其他執行緒在使用,當前執行緒將進行阻塞狀態,等待再次獲取鎖資源。

1.1 AQS中如何獲取鎖

java.util.concurrent.locks.ReentrantLock.java檔案中的公平鎖為例:

abstract static class Sync extends AbstractQueuedSynchronizer
#java.util.concurrent.locks.ReentrantLock中第220行
static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);  #呼叫了AQS中的方法
        }
...
}
================AQS====================
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第1197行
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

      在lock()方法中,執行緒首先嚐試搶鎖tryAcquire(1),如果搶鎖成功則直接返回true,代表當前執行緒已持有鎖資源,否則返回false,進行下一次搶鎖動作。
      當執行緒搶鎖失敗後,AQS將將當前執行緒封裝成Node節點,並新增到阻塞佇列。之後將從阻塞佇列中依次取出等待鎖的Node節點,並再次嘗試獲取鎖.如果再次獲取鎖失敗,則使當前執行緒自己中斷自己。

1.2 嘗試獲取鎖資源

      首先獲取鎖的狀態,判斷當前是否有執行緒持有鎖,這裡分為兩種情況:

  • 如果當前並沒有執行緒持有鎖資源,則判斷阻塞佇列中是否有節點排在當前節點的前面等待獲取鎖資源。這裡分為兩種情況:

    • 如果有其他執行緒在等待獲取鎖資源,則進行等待
    • 如果沒有其他執行緒在等待獲取鎖資源,表明當前執行緒是第一個等待獲取鎖的執行緒,隨後嘗試對鎖資源進行獲取,如果成功獲取到鎖資源則將當前執行緒設定為獨佔鎖的執行緒,同時返回true.
  • 如果當前有執行緒持有鎖,則進行判斷是否是當前執行緒所持有鎖資源,這是分為兩種情況:
    • 鎖資源被當前執行緒所持有,則表明是重入鎖,隨後將獲取鎖的次數加一,返回true.
    • 持有鎖資源並不是當前執行緒,返回false.

流程圖如下:

原始碼:

#java.util.concurrent.locks.ReentrantLock中第231行
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
        }
        return false;
}

1.3 判斷阻塞佇列中是否有其他節點

      線上程獲取鎖之前,首先判斷阻塞佇列中是否有其他節點,如果有其他節點則放棄搶鎖。
首先獲取AQS連結串列中的頭節點與尾節點,分別進行判斷:

  • 頭節點是否等於尾節點
    • 如果頭節點等於尾節點說明阻塞佇列為空,沒有其他節點返回false
  • 如果頭節點不等於尾節點,則判斷頭節點的後置節點是否為空
    • 如果頭節點的後置節點不為空,則說明阻塞佇列不為空,則判斷阻塞佇列中第一個節點執行緒是否為當前執行緒
      • 如果是當前執行緒說明阻塞佇列中沒有其他節點返回false。
      • 如果不是當前執行緒說明阻塞佇列中有其他節點,返回true.

流程圖如下:

原始碼:

#java.util.concurrent.locks.AbstractQueuedSynchronizer中第1512行
public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

1.4 將當前執行緒新增到阻塞佇列

      如果當前執行緒搶鎖失敗則通過AQS將當前執行緒包裝成Node節點新增進阻塞佇列。

  1. 將當前執行緒以獨佔鎖的模式包裝成Node節點。

  2. 將當前節點新增進阻塞佇列。分兩種情況:
    • 阻塞佇列中尾節點不為空。
      • 將尾節點置為當前節點的前驅節點,通過CAS操作將當前節點置為尾節點。
        • 如果成功,則將之前尾結點的後置引用指向當前節點,將當前節點返回。
        • 如果存在另一節點提前完成上一步操作,則進行入隊操作。
    • 阻塞佇列中尾節點為空,則進行入隊操作。
  3. 入隊操作結束將當前節點返回。

流程圖如下:

原始碼:

#java.util.concurrent.locks.AbstractQueuedSynchronizer中第605行
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

1.5 入隊操作

      這一步將當前節點新增到阻塞佇列中。
      首先獲取阻塞佇列中的尾節點,判斷是否為空,有兩種情況:

  • 阻塞佇列中尾節點為空,則初始化阻塞佇列,將頭節點設定為尾節點,
    再次獲取尾節點,判斷是否為空。
  • 阻塞佇列中尾節點不為空,則將尾節點設定為當前節點的前驅節點。
    通過CAS將當前節點設定為尾節點。這裡有兩種情況:
    • 如果成功,則將之前尾結點的後置引用指向當前節點,將當前節點的前驅節點返回。
    • 存在另一節點提前完成上一步操作,則再次獲取阻塞佇列中的尾節點,判斷是否為空。

流程圖如下:

原始碼:

#java.util.concurrent.locks.AbstractQueuedSynchronizer中第583行
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

1.6 搶鎖或將執行緒掛起

      到達這一步說明節點已進入阻塞佇列,節點嘗試獲取鎖或者進行掛起操作。

  1. 獲取當前節點的前驅節點
  2. 判斷前驅節點是否為頭節點,這裡有兩種情況:
    • 前驅節點為頭節點,說明當前節點前面沒有節點在等待獲取鎖資源,只需要等待前驅節點釋放鎖資源。所以可以嘗試搶鎖,這裡有兩種情況:
      • 搶鎖成功,則將當前節點設定為頭節點,將當前節點前驅節點的後置引用設定為空,返回false
      • 搶鎖失敗,說明頭節點還沒有釋放鎖資源,此時將當前節點掛起。這裡有兩種情況:
        • 如果掛起成功,則執行緒等待被喚醒,喚醒之後再次判斷前驅節點是否為頭節點。
        • 如果掛起失敗,再次判斷前驅節點是否為頭節點。
    • 前驅節點不是頭節點,說明當前節點前面有其他節點在等待獲取鎖資源,此時將當前節點掛起。
  3. 如果在掛起階段發生異常,則取消搶鎖。
  4. 這裡為無限迴圈,直到執行緒獲取到鎖資源或者取消搶鎖才會退出迴圈。

流程圖如下:

原始碼:

#java.util.concurrent.locks.AbstractQueuedSynchronizer中第857行
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
            }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

1.7 判斷是否應該掛起當前執行緒

      當執行緒暫時獲取不到鎖資源時,判斷是否應該掛起當前執行緒。
      首先獲取當前節點的前驅節點的狀態,這裡有三種情況:
* 前驅節點的狀態為SIGNAL。其中,SIGNAL表明該節點在釋放鎖資源後應該將後置節點喚醒。返回true。
* 前驅節點的狀態為CANCELLED。CANCELLED表明該節點已取消搶鎖,此時將從當前節點開始向前尋找,直到找到一個節點的狀態不為CANCELLED,然後將他設定為當前節點的前驅節點。之後返回false.
* 如果前驅節點的狀態不是以上兩種情況,則通過CAS將前驅節點的狀態設定為SIGNAL,之後返回false。

流程圖如下:

原始碼:

#java.util.concurrent.locks.AbstractQueuedSynchronizer中第795行
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

1.8 掛起當前執行緒

      將當前執行緒掛起,當執行緒被喚醒後將執行緒的中斷狀態返回.
原始碼:

#java.util.concurrent.locks.AbstractQueuedSynchronizer中第835行
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

2 解鎖

2.1 解鎖操作

      嘗試釋放鎖資源,這裡有兩種情況:

  • 成功釋放鎖資源,獲取到AQS連結串列中頭節點,判斷頭節點是否為空,這裡有兩種情況:
    • 如果頭節點為空,說明沒有節點持有鎖資源,返回true.
    • 如果頭節點不為空,判斷頭節點狀態是否為0:
      • 如果頭節點狀態為0,說明阻塞佇列中沒有執行緒在等待獲取鎖,返回true.
      • 如果頭節點狀態不為0,則將阻塞佇列中第一個等待獲取鎖資源的執行緒喚醒。隨後返回ture.

流程圖如下:

原始碼:

#java.util.concurrent.locks.ReentrantLock中第456行
public void unlock() {
    sync.release(1);
}
==============================
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第1260行
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

2.2 喚醒後置節點

      當持有鎖的節點執行相關程式碼完成後,需要釋放鎖資源並喚醒後置節點。

  1. 首先獲取頭節點的狀態,如果小於0則通過CAS將狀態設定為0.
  2. 獲取頭節點的後置節點,這裡有兩種情況:
    • 如果頭節點的後置節點為空或者頭節點的後置節點的狀態大於0,則將頭節點的後置節點置為空,同時從AQS連結串列的尾節點向前搜尋,直到找到最後一個節點狀態小於等於0的節點,將該節點喚醒。
    • 如果頭節點的後置節點不為空,則直接將該節點喚醒。

流程圖如下:

原始碼:

#java.util.concurrent.locks.AbstractQueuedSynchronizer中第638行
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

2.3 取消搶鎖

      當執行緒由於異常或某些特殊情況的發生,需要取消對鎖資源的獲取,將執行取消搶鎖操作。

  1. 如果需要取消搶鎖的節點為空,則直接返回。
  2. 否則將節點所包裝的執行緒置為空。
  3. 獲取節點的前驅節點,判斷前驅節點的狀態是否大於0,如果大於0則一直向前找,直到找到一個節點的狀態小於等於0,將該節點設定為當前節點的前驅節點。
  4. 獲取當前節點的後置節點。
  5. 將當前節點的狀態設定為CANCELLED。
  6. 判斷當前節點是否為尾節點,這裡有兩種情況:
    • 如果當前節點是尾節點,則將當前節點的前驅節點設定為尾節點,
      同時將後置引用設定為空。
    • 如果當前節點不是尾節點,判斷當前節點的前驅節點是否為頭節
      點。這裡有兩種情況:
      • 如果當前節點的前驅節點是頭節點,則將當前節點喚醒。
      • 如果當前節點的前驅節點不是頭節點,則判斷該節點狀態是否為SIGNAL,如果為SIGNAL,則將該節點的後置引用指向當前節點的後置節點。
    • 斷開當前節點與連結串列的連線。

流程圖如下:

原始碼:

#java.util.concurrent.locks.AbstractQueuedSynchronizer中第742行
private void cancelAcquire(Node node) {
    if (node == null)
        return;
    
    node.thread = null;

    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    node.waitStatus = Node.CANCELLED;

    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
            compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

      其實到這裡還有一些內容並沒有分析完,以後再補上好了