22、AtomicInteger底層實現原理是什麼?如何在自己的產品程式碼中應用CAS操作?(高併發程式設計----8)
目錄
今天我要問你的問題是,AtomicInteger 底層實現原理是什麼?如何在自己的產品程式碼中應用 CAS 操作?
在今天這一講中,我來分析一下併發包內部的組成,一起來看看各種同步結構、執行緒池等,是基於什麼原理來設計和實現的。
今天我要問你的問題是,AtomicInteger 底層實現原理是什麼?如何在自己的產品程式碼中應用 CAS 操作?
典型回答
AtomicIntger 是對 int 型別的一個封裝,提供原子性的訪問和更新操作,其原子性操作的實現是基於 CAS(compare-and-swap)技術。
所謂 CAS,表徵的是一些列操作的集合,獲取當前數值,進行一些運算,利用 CAS 指令試圖進行更新。如果當前數值未變,代表沒有其他執行緒進行併發修改,則成功更新。否則,可能出現不同的選擇,要麼進行重試,要麼就返回一個成功或者失敗的結果。
從 AtomicInteger 的內部屬性可以看出,它依賴於 Unsafe 提供的一些底層能力,進行底層操作;以 volatile 的 value
欄位,記錄數值,以保證可見性。
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe(); private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value"); private volatile int value;
具體的原子操作細節,可以參考任意一個原子更新方法,比如下面的 getAndIncrement。
Unsafe 會利用 value 欄位的記憶體地址偏移,直接完成操作。
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
因為 getAndIncrement 需要返歸數值,所以需要新增失敗重試邏輯。
public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!weakCompareAndSetInt(o, offset, v, v + delta)); return v; }
而類似 compareAndSet 這種返回 boolean 型別的函式,因為其返回值表現的就是成功與否,所以不需要重試。
public final boolean compareAndSet(int expectedValue, int newValue)
CAS 是 Java 併發中所謂 lock-free 機制的基礎。
考點分析
今天的問題有點偏向於 Java 併發機制的底層了,雖然我們在開發中未必會涉及 CAS 的實現層面,但是理解其機制,掌握如何在 Java 中運用該技術,還是十分有必要的,尤其是這也是個併發程式設計的面試熱點。
有的同學反饋面試官會問 CAS 更加底層是如何實現的,這依賴於 CPU 提供的特定指令,具體根據體系結構的不同還存在著明顯區別。比如,x86 CPU 提供 cmpxchg 指令;而在精簡指令集的體系架構中,則通常是靠一對兒指令(如“load and reserve”和“store conditional”)實現的,在大多數處理器上 CAS 都是個非常輕量級的操作,這也是其優勢所在。
大部分情況下,掌握到這個程度也就夠用了,我認為沒有必要讓每個 Java 工程師都去了解到指令級別,我們進行抽象、分工就是為了讓不同層面的開發者在開發中,可以儘量遮蔽不相關的細節。
如果我作為面試官,很有可能深入考察這些方向:
- 在什麼場景下,可以採用 CAS 技術,呼叫 Unsafe 畢竟不是大多數場景的最好選擇,有沒有更加推薦的方式呢?畢竟我們掌握一個技術,cool 不是目的,更不是為了應付面試,我們還是希望能在實際產品中有價值。
- 對 ReentrantLock、CyclicBarrier 等併發結構底層的實現技術的理解。
知識擴充套件
關於 CAS 的使用,你可以設想這樣一個場景:在資料庫產品中,為保證索引的一致性,一個常見的選擇是,保證只有一個執行緒能夠排他性地修改一個索引分割槽,如何在資料庫抽象層面實現呢?
可以考慮為索引分割槽物件新增一個邏輯上的鎖,例如,以當前獨佔的執行緒 ID 作為鎖的數值,然後通過原子操作設定 lock 數值,來實現加鎖和釋放鎖,虛擬碼如下:
public class AtomicBTreePartition {
private volatile long lock;
public void acquireLock(){}
public void releaseeLock(){}
}
那麼在 Java 程式碼中,我們怎麼實現鎖操作呢?Unsafe 似乎不是個好的選擇,例如,我就注意到類似 Cassandra 等產品,因為 Java 9 中移除了 Unsafe.moniterEnter()/moniterExit(),導致無法平滑升級到新的 JDK 版本。目前 Java 提供了兩種公共 API,可以實現這種 CAS 操作,比如使用 java.util.concurrent.atomic.AtomicLongFieldUpdater,它是基於反射機制建立,我們需要保證型別和欄位名稱正確。
private static final AtomicLongFieldUpdater<AtomicBTreePartition> lockFieldUpdater =
AtomicLongFieldUpdater.newUpdater(AtomicBTreePartition.class, "lock");
private void acquireLock(){
long t = Thread.currentThread().getId();
while (!lockFieldUpdater.compareAndSet(this, 0L, t)){
// 等待一會兒,資料庫操作可能比較慢
…
}
}
Atomic 包提供了最常用的原子性資料型別,甚至是引用、陣列等相關原子型別和更新操作工具,是很多執行緒安全程式的首選。
我在專欄第七講中曾介紹使用原子資料型別和 Atomic*FieldUpdater,建立更加緊湊的計數器實現,以替代 AtomicLong。優化永遠是針對特定需求、特定目的,我這裡的側重點是介紹可能的思路,具體還是要看需求。如果僅僅建立一兩個物件,其實完全沒有必要進行前面的優化,但是如果物件成千上萬或者更多,就要考慮緊湊性的影響了。
而 atomic 包提供的LongAdder,在高度競爭環境下,可能就是比 AtomicLong 更佳的選擇,儘管它的本質是空間換時間。迴歸正題,如果是 Java 9 以後,我們完全可以採用另外一種方式實現,也就是 Variable Handle API,這是源自於JEP 193,提供了各種粒度的原子或者有序性的操作等。我將前面的程式碼修改為如下實現:
private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle
(AtomicBTreePartition.class, "lock");
private void acquireLock(){
long t = Thread.currentThread().getId();
while (!HANDLE.compareAndSet(this, 0L, t)){
// 等待一會兒,資料庫操作可能比較慢
…
}
}
過程非常直觀,首先,獲取相應的變數控制代碼,然後直接呼叫其提供的 CAS 方法。
一般來說,我們進行的類似 CAS 操作,可以並且推薦使用 Variable Handle API 去實現,其提供了精細粒度的公共底層
API。我這裡強調公共,是因為其 API 不會像內部 API 那樣,發生不可預測的修改,這一點提供了對於未來產品維護和升級的基礎保障,坦白說,很多額外工作量,都是源於我們使用了 Hack 而非 Solution 的方式解決問題。
CAS 也並不是沒有副作用,試想,其常用的失敗重試機制,隱含著一個假設,即競爭情況是短暫的。大多數應用場景中,確實大部分重試只會發生一次就獲得了成功,但是總是有意外情況,所以在有需要的時候,還是要考慮限制自旋的次數,以免過度消耗 CPU。
另外一個就是著名的ABA問題,這是通常只在 lock-free 演算法下暴露的問題。我前面說過 CAS 是在更新時比較前值,如果對方只是恰好相同,例如期間發生了 A -> B -> A 的更新,僅僅判斷數值是 A,可能導致不合理的修改操作。針對這種情況,Java 提供了 AtomicStampedReference 工具類,通過為引用建立類似版本號(stamp)的方式,來保證 CAS 的正確性,具體用法請參考這裡的介紹。
前面介紹了 CAS 的場景與實現,幸運的是,大多數情況下,Java 開發者並不需要直接利用 CAS 程式碼去實現執行緒安全容器等,更多是通過併發包等間接享受到 lock-free 機制在擴充套件性上的好處。
下面我來介紹一下 AbstractQueuedSynchronizer(AQS),其是 Java 併發包中,實現各種同步結構和部分其他組成單元(如執行緒池中的 Worker)的基礎。
學習 AQS,如果上來就去看它的一系列方法(下圖所示),很有可能把自己看暈,這種似懂非懂的狀態也沒有太大的實踐意義。
我建議的思路是,儘量簡化一下,理解為什麼需要 AQS,如何使用 AQS,至少要做什麼,再進一步結合 JDK 原始碼中的實踐,理解 AQS 的原理與應用。
Doug Lea曾經介紹過 AQS 的設計初衷。從原理上,一種同步結構往往是可以利用其他的結構實現的,例如我在專欄第 19 講中提到過可以使用 Semaphore 實現互斥鎖。但是,對某種同步結構的傾向,會導致複雜、晦澀的實現邏輯,所以,他選擇了將基礎的同步相關操作抽象在 AbstractQueuedSynchronizer 中,利用 AQS 為我們構建同步結構提供了範本。
AQS 內部資料和方法,可以簡單拆分為:
- 一個 volatile 的整數成員表徵狀態,同時提供了 setState 和 getState 方法
private volatile int state;
- 一個先入先出(FIFO)的等待執行緒佇列,以實現多執行緒間競爭和等待,這是 AQS 機制的核心之一。
- 各種基於 CAS 的基礎操作方法,以及各種期望具體同步結構去實現的 acquire/release 方法。
利用 AQS 實現一個同步結構,至少要實現兩個基本型別的方法,分別是 acquire 操作,獲取資源的獨佔權;還有就是 release 操作,釋放對某個資源的獨佔。
以 ReentrantLock 為例,它內部通過擴充套件 AQS 實現了 Sync 型別,以 AQS 的 state 來反映鎖的持有情況。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer { …}
下面是 ReentrantLock 對應 acquire 和 release 操作,如果是 CountDownLatch 則可以看作是
await()/countDown(),具體實現也有區別。
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
排除掉一些細節,整體地分析 acquire 方法邏輯,其直接實現是在 AQS 內部,呼叫了 tryAcquire 和 acquireQueued,這是兩個需要搞清楚的基本部分。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先,我們來看看 tryAcquire。在 ReentrantLock 中,tryAcquire 邏輯實現在 NonfairSync 和 FairSync 中,分別提供了進一步的非公平或公平性方法,而 AQS 內部 tryAcquire 僅僅是個接近未實現的方法(直接拋異常),這是留個實現者自己定義的操作。
我們可以看到公平性在 ReentrantLock 構建時如何指定的,具體如下:
public ReentrantLock() {
sync = new NonfairSync(); // 預設是非公平的
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
以非公平的 tryAcquire 為例,其內部實現瞭如何配合狀態與 CAS 獲取鎖,注意,對比公平版本的 tryAcquire,它在鎖無人佔有時,並不檢查是否有其他等待者,這裡體現了非公平的語義。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();// 獲取當前 AQS 內部狀態量
if (c == 0) { // 0 表示無人佔有,則直接用 CAS 修改狀態位,
if (compareAndSetState(0, acquires)) {// 不檢查排隊情況,直接爭搶
setExclusiveOwnerThread(current); // 並設定當前執行緒獨佔鎖
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 即使狀態不是 0,也可能當前執行緒是鎖持有者,因為這是再入鎖
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
接下來我再來分析 acquireQueued,如果前面的 tryAcquire 失敗,代表著鎖爭搶失敗,進入排隊競爭階段。這裡就是我們所說的,利用 FIFO 佇列,實現執行緒間對鎖的競爭的部分,算是是 AQS 的核心邏輯。
當前執行緒會被包裝成為一個排他模式的節點(EXCLUSIVE),通過 addWaiter 方法新增到佇列中。acquireQueued 的邏輯,簡要來說,就是如果當前節點的前面是頭節點,則試圖獲取鎖,一切順利則成為新的頭節點;否則,有必要則等待,具體處理邏輯請參考我新增的註釋。
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {// 迴圈
final Node p = node.predecessor();// 獲取前一個節點
if (p == head && tryAcquire(arg)) { // 如果前一個節點是頭結點,表示當前節點合適去 tryAcquire
setHead(node); // acquire 成功,則設定新的頭節點
p.next = null; // 將前面節點對當前節點的引用清空
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)) // 檢查是否失敗後需要 park
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);// 出現異常,取消
if (interrupted)
selfInterrupt();
throw t;
}
}
到這裡執行緒試圖獲取鎖的過程基本展現出來了,tryAcquire 是按照特定場景需要開發者去實現的部分,而執行緒間競爭則是 AQS 通過 Waiter 佇列與 acquireQueued 提供的,在 release 方法中,同樣會對佇列進行對應操作。
今天我介紹了 Atomic 資料型別的底層技術 CAS,並通過例項演示瞭如何在產品程式碼中利用 CAS,最後介紹了併發包的基礎技術 AQS,希望對你有所幫助。
一課一練
關於今天我們討論的題目你做到心中有數了嗎?今天佈置一個原始碼閱讀作業,AQS 中 Node 的 waitStatus 有什麼作用?