抽象同步佇列AQS——AbstractQueuedSynchronizer鎖詳解
AQS——鎖的底層支援
談到併發,不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronizer(AQS)!
類如其名,抽象的佇列式的同步器,AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock/Semaphore/CountDownLatch...
併發包的底層就是使用AQS實現的,以下是AQS的類圖結構
框架
它維護了一個volatile int state(代表共享資源)和一個FIFO執行緒等待佇列(多執行緒競爭資源被阻塞會進入此佇列)。這裡volatile保證執行緒可見性。
state的訪問方式有三種:
getState()
setState()
compareAndSetState()
這三種都是原子操作,其中compareAndSetState的實現依賴於Unsafe的compareAndSwapInt()方法。程式碼如下:
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
自定義資源共享方式
AQS定義了兩種資源共享方式:Exclusive(獨佔,只有一個執行緒能執行,如ReentantLock)和Share(共享,多個執行緒可同時執行,如Semaphore/CountDownLatch)。
不同的自定義同步器爭用共享資源的方式也不同,自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法。
isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。
tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared(int):共享方式。嘗試獲取資源,負數表示失敗;0表示成功,但沒用剩餘可用資源;正數表示成功,且有剩餘資源。
tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待節點返回true,否則返回false。
原始碼實現
接下來我們開始開始講解AQS的原始碼實現。依照acquire-release、acquireShared-releaseShared的次序來。
1. acquire(int)
acquire是一種以獨佔方式獲取資源,如果獲取到資源,執行緒直接返回,否則進入等待佇列,直到獲取到資源為止,且整個過程忽略中斷的影響。該方法是獨佔模式下執行緒獲取共享資源的頂層入口。
獲取到資源後,執行緒就可以去執行其臨界區程式碼了。下面是acquire()的原始碼
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
通過註釋我們知道,acquire方法是一種互斥模式,且忽略中斷。該方法至少執行一次tryAcquire(int)
方法,如果tryAcquire(int)
方法返回true,則acquire直接返回,否則當前執行緒需要進入佇列進行排隊。函式流程如下
1、tryAcquire():嘗試直接獲取資源,如果成功則直接返回;
2、addWaiter():將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
3、acquireQueued():使執行緒在等待佇列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
4、如果執行緒在等待過程中被中斷過,它是不響應的。只有獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。
相關方法介紹
1.tryAcquire(int)
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
tryAcquire嘗試以獨佔的方式獲取資源,如果獲取成功,則直接返回true,否則直接返回false。該方法可以用於實現Lock中的tryLock()方法。該方法的預設實現是丟擲UnsupportedOperationException異常,
具體實現由自定義的擴充套件了AQS的同步類來實現。AQS在這裡只負責定義了一個公共的方法框架。這裡之所以沒用定義為abstract,是因為獨佔模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared
如果都定義成abstract,那麼每個模式都要去實現另外一個模式下的介面。
2.addWaiter(Node)
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
該方法用於將當前執行緒根據不同的模式,(Node.EXCLUSIVE
互斥模式、Node.SHARED
共享模式)加入到等待佇列的隊尾並返回當前執行緒所在的節點。如果佇列不為空,
則以通過compareAndSetTail
方法以CAS的方式將當前節點加入到等待佇列的末尾。否則,通過enq(node)方法初始化一個等待佇列,並返回當前節點。
3. enq(node)
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
enq(node)用於將當前節點插入到等待佇列,如果佇列為空,則初始化當前佇列。整個過程以CAS自旋的方式進行,直到成功加入隊尾為止。
4.acquireQueued(Node, int)
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true;//——標記是否成功拿到資源,預設是false try { boolean interrupted = false;//——標記等待過程是否被中斷過 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued()
用於佇列中的執行緒自旋地以獨佔且不可中斷的方式獲取同步狀態(acquire),直到拿到鎖之後再返回。該方法的實現分成兩部分:
如果當前節點已經成為頭結點,嘗試獲取鎖(tryAcquire)成功,然後返回;否則檢查當前節點是否應該被park,然後將該執行緒park並且檢查當前執行緒是否被可以被中斷。
5.shouldParkAfterFailedAcquire(Node, Node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
shouldParkAfterFailedAcquire方法通過對當前節點的前一個節點的狀態進行判斷,對當前節點做出不同的操作,至於每個Node的狀態表示,可以參考介面文件。
6.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
該方法讓執行緒去休息,真正進入等待狀態。park()會讓當前執行緒進入waiting狀態。在此狀態下,有兩種途徑可以喚醒該執行緒:
1)被unpark();
2)被interrupt()。
需要注意的是,Thread.interrupted()會清除當前執行緒的中斷標記位。
我們再回到acquireQueued(),總結下該函式的具體流程:
1、節點進入隊尾後,檢查狀態,找到安全休息點
2、呼叫park()進入waiting狀態,等待unpark()或interrupt()喚醒自己
3、被喚醒後,看自己是不是有資格能拿到號。如果能拿到,head指向當前節點,並返回從入隊到拿到號的整個過程中是否被中斷過;如果沒用拿到,繼續流程1
最後,總結一下acquire()的流程:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1、呼叫自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
2、沒成功,則執行addWaiter()將執行緒加入等待佇列的尾部並標記為獨佔模式;
3、acquireQueued()使執行緒在等待佇列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源才返回。如果在整個等待過程中被中斷過,則會返回true,否則返回false。
4、如果執行緒在等待過程中被中斷過,他是不響應的。只是獲取資源後才進行自我中斷selfInterrupt(),將中斷補上。
未完...
參考書籍
Java併發程式設計之美
參考連結
https://www.jianshu.com/p/da9d051dcc3d