1. 程式人生 > 程式設計 >JUC之深入分析 CAS

JUC之深入分析 CAS

概述

CAS ,Compare And Swap ,即比較並交換。Doug Lea 大神在實現同步元件時,大量使用CAS 技術,鬼斧神工地實現了Java 多執行緒的併發操作。

整個 AQS 同步元件、Atomic 原子類操作等等都是基 CAS 實現的,甚至 ConcurrentHashMap 在 JDK 1.8 的版本中,也調整為 CAS + synchronized 。可以說,CAS 是整個 J.U.C 的基石。

CAS分析

在 CAS 中有三個引數:記憶體值 V、舊的預期值 A、要更新的值 B ,當且僅當記憶體值 V 的值等於舊的預期值 A 時,才會將記憶體值V的值修改為 B ,否則什麼都不幹。其虛擬碼如下:

if (this.value == A) {
	this.value = B
	return true;
} else {
	return false;
}
複製程式碼

J.U.C 下的 Atomic 類,都是通過 CAS 來實現的。下面就以 AtomicInteger 為例,來闡述 CAS 的實現。如下:

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"
)); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; 複製程式碼

Unsafe 是 CAS 的核心類,Java 無法直接訪問底層作業系統,而是通過本地 native 方法來訪問。

不過儘管如此,JVM還是開了一個後門:Unsafe ,它提供了硬體級別的原子操作。

valueOffset 為變數值在記憶體中的偏移地址,Unsafe 就是通過偏移地址來得到資料的原值的。 value 當前值,使用 volatile 修飾,保證多執行緒環境下看見的是同一個。

AtomicInteger

我們就以 AtomicInteger 的 #addAndGet() 方法來做說明,先看原始碼:

// AtomicInteger.java
public final int addAndGet(int delta) {
    return unsafe.getAndAddInt(this,valueOffset,delta) + delta;
}

// Unsafe.java
public final int getAndAddInt(Object var1,long var2,int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1,var2);
    } while(!this.compareAndSwapInt(var1,var2,var5,var5 + var4));

    return var5;
}
複製程式碼

內部呼叫 unsafe 的 #getAndAddInt(Object var1,long var2,int var4)方法,在#getAndAddInt(Object var1,int var4) 方法中,主要是看 #compareAndSwapInt(Object var1,int var4,int var5) 方法,程式碼如下:

public final native boolean compareAndSwapInt(Object var1,int var5);
複製程式碼

該方法為本地方法,有四個引數,分別代表:物件、物件的地址、預期值、修改值

CPU 原子操作

CAS 可以保證一次的讀-改-寫操作是原子操作,在單處理器上該操作容易實現,但是在多處理器上實現就有點兒複雜了。

CPU 提供了兩種方法來實現多處理器的原子操作:匯流排加鎖或者快取加鎖。

  1. 匯流排加鎖:匯流排加鎖就是就是使用處理器提供的一個 LOCK# 訊號,當一個處理器在匯流排上輸出此訊號時,其他處理器的請求將被阻塞住,那麼該處理器可以獨佔使用共享記憶體。但是這種處理方式顯得有點兒霸道,不厚道,他把CPU和記憶體之間的通訊鎖住了,在鎖定期間,其他處理器都不能使用其他記憶體地址的資料,其開銷有點兒大。所以就有了快取加鎖。

  2. 快取加鎖:其實針對於上面那種情況,我們只需要保證在同一時刻,對某個記憶體地址的操作是原子性的即可。快取加鎖,就是快取在記憶體區域的資料如果在加鎖期間,當它執行鎖操作寫回記憶體時,處理器不再輸 出LOCK# 訊號,而是修改內部的記憶體地址,利用快取一致性協議來保證原子性。快取一致性機制可以保證同一個記憶體區域的資料僅能被一個處理器修改,也就是說當 CPU1 修改快取行中的 i 時使用快取鎖定,那麼 CPU2 就不能同時快取了 i 的快取行。

CAS缺陷

CAS 雖然高效地解決了原子操作,但是還是存在一些缺陷的,主要表現在三個方面:

  1. 迴圈時間太長
  2. 只能保證一個共享變數原子操作
  3. ABA 問題

迴圈時間太長

如果CAS一直不成功呢?這種情況絕對有可能發生,如果自旋 CAS 長時間地不成功,則會給 CPU 帶來非常大的開銷。在 J.U.C 中,有些地方就限制了 CAS 自旋的次數,例如: BlockingQueue 的 SynchronousQueue 。

只能保證一個共享變數原子操作

看了 CAS 的實現就知道這隻能針對一個共享變數,如果是多個共享變數就只能使用鎖了,當然如果你有辦法把多個變數整成一個變數,利用 CAS 也不錯。例如讀寫鎖中 state 的高低位。

ABA問題

CAS 需要檢查操作值有沒有發生改變,如果沒有發生改變則更新。但是存在這樣一種情況:如果一個值原來是 A,變成了 B,然後又變成了 A,那麼在 CAS 檢查的時候會發現沒有改變,但是實質上它已經發生了改變,這就是所謂的ABA問題。對於 ABA 問題其解決方案是加上版本號,即在每個變數都加上一個版本號,每次改變時加 1 ,即 A —> B —> A ,變成1A —> 2B —> 3A 。

影響

用一個例子來闡述 ABA 問題所帶來的影響。

有如下連結串列如下圖:

假如我們想要把 A 替換為 B ,也就是 #compareAndSet(this,A,B) 。執行緒 1 執行 A 替換 B 操作之前,執行緒 2 先執行如下動作,A 、B 出棧,然後 C、A 入棧,最終該連結串列如下:

完成後,執行緒 1 發現仍然是 A ,那麼 #compareAndSet(this,B) 成功,但是這時會存在一個問題就是 B.next = null,因此 #compareAndSet(this,B) 後,會導致 C 丟失,改棧僅有一個 B 元素,平白無故把 C 給丟失了。

解決方案 AtomicStampedReference

CAS 的 ABA 隱患問題,解決方案則是版本號,Java 提供了 AtomicStampedReference 來解決。AtomicStampedReference 通過包裝 [E,Integer] 的元組,來對物件標記版本戳 stamp ,從而避免 ABA 問題。對於上面的案例,應該執行緒 1 會失敗。

AtomicStampedReference 的 #compareAndSet(...) 方法,程式碼如下:

public boolean compareAndSet(V   expectedReference,V   newReference,int expectedStamp,int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current,Pair.of(newReference,newStamp)));
}
複製程式碼

有四個方法引數,分別表示:預期引用、更新後的引用、預期標誌、更新後的標誌。原始碼部分很好理解:

  1. 預期的引用 == 當前引用
  2. 預期的標識 == 當前標識
  3. 如果更新後的引用和標誌和當前的引用和標誌相等,則直接返回 true
  4. 通過 Pair#of(T reference,int stamp) 方法,生成一個新的 Pair 物件,與當前 Pair CAS 替換。

Pair 為 AtomicStampedReference 的內部類,主要用於記錄引用和版本戳資訊(標識),定義如下:

// AtomicStampedReference.java

private static class Pair<T> {
    final T reference; // 物件引用
    final int stamp; // 版本戳
    private Pair(T reference,int stamp) {
        this.reference = reference;
        this.stamp = stamp;
    }
    static <T> Pair<T> of(T reference,int stamp) {
        return new Pair<T>(reference,stamp);
    }
}

private volatile Pair<V> pair;
複製程式碼
  1. Pair 記錄了物件的引用和版本戳,版本戳為 int 型,保持自增。同時 Pair 是一個不可變物件,其所有屬性全部定義為 final 。對外提供一個 #of(T reference,int stamp) 方法,該方法返回一個新建的 Pair 物件。

  2. pair 屬性,定義為 volatile ,保證多執行緒環境下的可見性。在AtomicStampedReference 中,大多方法都是通過呼叫 Pair 的 #of(T reference,int stamp) 方法,來產生一個新的 Pair 物件,然後賦值給變數 pair 。如 set(V newReference,int newStamp)方法,程式碼如下:

// AtomicStampedReference.java
public void set(V newReference,int newStamp) {
    Pair<V> current = pair;
    if (newReference != current.reference || newStamp != current.stamp)
        this.pair = Pair.of(newReference,newStamp);
}
複製程式碼

實際案例

下面,我們將通過一個例子,可以看到 AtomicStampedReference 和 AtomicInteger 的區別。我們定義兩個執行緒,執行緒 1 負責將 100 —> 110 —> 100,執行緒 2 執行 100 —>120 ,看兩者之間的區別。

public class Test {
    private static AtomicInteger atomicInteger = new AtomicInteger(100);
    private static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1);

    public static void main(String[] args) throws InterruptedException {

        // AtomicInteger
        Thread at1 = new Thread(new Runnable() {
            @Override
            public void run() {
                atomicInteger.compareAndSet(100,110);
                atomicInteger.compareAndSet(110,100);
            }
        });

        Thread at2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(2);      // at1,執行完
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("AtomicInteger:" + atomicInteger.compareAndSet(100,120));
            }
        });

        at1.start();
        at2.start();

        at1.join();
        at2.join();

        // AtomicStampedReference

        Thread tsf1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //讓 tsf2先獲取stamp,導致預期時間戳不一致
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 預期引用:100,更新後的引用:110,預期標識getStamp() 更新後的標識getStamp() + 1
                atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
                atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp() + 1);
            }
        });

        Thread tsf2 = new Thread(new Runnable() {
            @Override
            public void run() {
                int stamp = atomicStampedReference.getStamp();

                try {
                    TimeUnit.SECONDS.sleep(2);      //執行緒tsf1執行完
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1));
            }
        });

        tsf1.start();
        tsf2.start();
    }

}
複製程式碼

執行結果:

執行結果充分展示了 AtomicInteger 導致的 ABA 問題,和使用 AtomicStampedReference 解決 ABA 問題。