java併發:AQS
AQS是一個FIFO的雙向佇列,其內部通過head和tail記錄隊首和隊尾元素,佇列元素的型別為 Node。
Node
Node 中的 thread變數用來存放進入 AQS 佇列的執行緒;
Node 中的 SHARED 用來標記該執行緒是獲取共享資源時被阻塞掛起後放入 AQS 佇列的;EXCLUSIVE 用來標記該執行緒是獲取獨佔資源時被阻塞掛起後放入 AQS 佇列的;
waitStatus 記錄當前執行緒等待狀態,可以為:CANCELLED (執行緒被取消了)、SIGNAL (執行緒需要被喚醒)、CONDITION (執行緒在條件佇列裡面等待〉、PROPAGATE (釋放共享資源時需要通知其他節點);
prev 記錄當前節點的前驅節點;next 記錄當前節點的後繼節點
ConditionObject
AQS 的內部類 ConditionObject 結合鎖實現執行緒同步。
ConditionObject 可以直接訪問 AQS物件內部的變數,比如 state 和 AQS 佇列。
ConditionObject 是條件變數,每個條件變數在內部維護了一個條件佇列 (單向連結串列佇列),這個條件佇列和 AQS 佇列不是一回事。
此處的佇列是用來存放呼叫條件變數的 await 方法後被阻塞的執行緒,佇列的頭、尾元素分別為 firstWaiter 和 lastWaiter。
Note:
呼叫條件變數的 await()方法就相當於呼叫共享變數的 wait()方法,呼叫條件變數的 signal方法就相當於呼叫共享變數的 notify()方法,呼叫條件變數的 signa!All ( )方法就相當於呼叫共享變數的 notifyAll()方法。
至此,相信大家都已經知道條件變數是什麼了,它能用來做什麼。
示例
示例中(2)處使用Lock 物件的 newCondition ()方法建立了一個 ConditionObject 變數,該變數就是 Lock鎖對應的一個條件變數。
示例中(3)處獲取了獨佔鎖,示例中(4)處則呼叫了條件變數的 await ()方法阻塞掛起了當前執行緒。
當其他執行緒呼叫條件變數的 signal方法時,被阻塞的執行緒才會從 await處返回。
Note:
在呼叫條件變數的 signal 和 await方法前必須先獲取條件變數對應的鎖,如果在沒有獲取到鎖之前呼叫了條件變數的 await方法則會丟擲 java.lang.IllegalMonitorStateException異常。
一個 Lock物件可以建立多個條件變數。
AQS 只提供了 ConditionObject 的實現,並沒有提供 newCondition 函式,該函式用來 new 一個 ConditionObject物件;需要由 AQS 的子類來實現 newCondition 函式。
小結:
下圖反映了前面描述的關係:
詳解:
當多個執行緒同時呼叫 lock.lock()方法獲取鎖時,只有一個執行緒獲取到了鎖,其他執行緒會被轉換為 Node 節點插入到 lock 鎖對應的 AQS 阻塞佇列裡面,並做自旋 CAS 嘗試獲取鎖。
如果獲取到鎖的執行緒呼叫了對應條件變數的 await()方法,則該執行緒會釋放獲取到的鎖,並被轉換為 Node 節點插入到條件變數對應的條件佇列裡面。
此時因呼叫 lock.lock() 方法被阻塞到 AQS 佇列裡面的一個執行緒會獲取到被釋放的鎖,如果該執行緒也呼叫了條件變數的 await ()方法則該執行緒也會被放入條件變數的條件佇列裡面。
當另外一個執行緒呼叫條件變數的 signal()或者 signa!All()方法時,會把條件佇列裡面的一個或者全部 Node節點移動到 AQS 的阻塞佇列裡面,等待時機獲取鎖。
state
在 AQS 中維持了一個狀態值 state,可以通過 getState、setState、compareAndSetState 函式修改其值,程式碼如下:
/** * The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); }
對於 AQS 來說,執行緒同步的關鍵是對 state 進行操作。
根據 state 是否屬於一個執行緒,操作 state 的方式分為獨佔方式和共享方式。
獨佔方式
使用獨佔方式獲取的資源是與具體執行緒繫結的,也就是說如果一個執行緒獲取到了資源,則進行標記,其他執行緒嘗試操作 state 獲取資源時會發現當前該資源的持有者不是自己,於是在獲取失敗後被阻塞。
例子:獨佔鎖 ReentrantLock 的實現
當一個執行緒獲取了Reer rantLock的鎖後,在 AQS 內部會使用 CAS 操作把狀態值 state 從0 變為 1,然後設定當前鎖的持有者為當前執行緒,當該執行緒再次獲取鎖時發現它就是鎖的持有者,則把狀態值從 l 變為 2,也就是設定可重入次數,而當另外一個執行緒獲取鎖時發現自己不是該鎖的持有者就會被放入 AQS 阻塞佇列後掛起。
在獨佔方式下獲取和釋放資源的方法為:
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * Acquires in exclusive mode, aborting if interrupted. * Implemented by first checking interrupt status, then invoking * at least once {@link #tryAcquire}, returning on * success. Otherwise the thread is queued, possibly repeatedly * blocking and unblocking, invoking {@link #tryAcquire} * until success or the thread is interrupted. This method can be * used to implement method {@link Lock#lockInterruptibly}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
共享方式
使用共享方式獲取資源與具體執行緒不相關;當多個執行緒去請求資源時通過 CAS 方式競爭。
當一個執行緒獲取到資源後,另一個執行緒嘗試獲取資源時,如果當前資源能滿足它的需要,則當前執行緒只需要使用 CAS 方式進行獲取即可。
例子:Semaphore 訊號量
當一個執行緒通過 acquire()方法獲取訊號量時,會首先看當前訊號量個數是否滿足需要,不滿足則把當前執行緒放入阻塞佇列,如果滿足則通過自旋 CAS 獲取訊號量。
在共享方式下獲取和釋放資源的方法為:
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
Note:
AQS是鎖和同步容器的基礎框架,AQS並沒有提供可用的tryAcquire和tryRelease方法。
tryAcquire和 tryRelease需要由具體的子類來實現。
子類在實現 tryAcquire和 tryRelease時要根據具體場景使用 CAS演算法嘗試修改 state狀態值, 成功則返回 true,否則返回 false。
子類還需要定義在呼叫 acquire 和 release 方法時狀態值 state 的增減代表什麼含義。
例子:
繼承自 AQS 實現的獨佔鎖 ReentrantLock,定義當 status 為 0 時表示鎖空閒,為 1 時表示鎖己經被佔用。
故其在重寫 tryAcquire 時,需要使用 CAS 演算法檢視當前 state 是否為 0,如果為 0 則使用 CAS 設定為 1,並設定當前鎖的持有者為當前執行緒,而後返回true;如果 CAS 失敗則返回 false。
同理,tryAcquireShared 和 tryReleaseShared 也需要由具體的子類來實現。
例子:
繼承自 AQS 實現的讀寫鎖 ReentrantReadWriteLock,讀鎖在重寫 tryAcquireShared 時,首先檢視寫鎖是否被其他執行緒持有,如果是則直接返回 false; 否則使用 CAS 遞增 state 的高 16 位。
(在 ReentrantReadWriteLock 中, state 的高 16 位為獲取讀鎖的次數)
基於 AQS 實現的鎖除了需要重寫上面介紹的方法外,還需要重寫 isHeldExclusively 方法,來判斷鎖是被當前執行緒獨佔還是被共享。
問題:帶有 Interruptibly關鍵字的函式和不帶該關鍵字的函式有什麼區別?
不帶 Intenuptibly 關鍵字的方法意思是不對中斷進行響應。
詳解:執行緒在呼叫不帶 Interruptibly 關鍵字的方法獲取資源時或者獲取資源失敗被掛起時,其他執行緒中斷了該執行緒,則該執行緒不會因為被中斷而丟擲異常,它還是繼續獲取資源或者被掛起,也就是說不對中斷進行響應,忽略中斷。
帶 Interruptibly關鍵字的方法要對中斷進行響應。
詳解:執行緒在呼叫帶 Interruptibly 關鍵字的方法獲取資源時或者獲取資源失敗被掛起時,其他執行緒中斷了該執行緒,則該執行緒丟擲 InterruptedException 異常而返回。