1. 程式人生 > 程式設計 >併發 - CAS 的操作、實現、原理及優化

併發 - CAS 的操作、實現、原理及優化

簡介

在 Java 中很多工具類都在使用 CAS(Compare And Set)用以提升併發的效率以及資料的準確性質。

  • concurrent 和 concurrent.atomic 下面的很多 AtomicInteger 等類
  • concurrent.locks 包下面的 ReentrantLock 、WriteLock 等
  • 其它

對於大部分人來說,最常見的應該就是使用 AtomicXXX、以及在使用 Lock 相關的子類 的時候我們知道他們的底層運用了 CAS,也知道 CAS 就是傳入一個更新前得期待值(expect)和一個需要更新的值(update),如果滿足要求那麼執行更新,否則的話就算執行失敗,來達到資料的原子性。

我們知道 CAS 肯定用某一種方式在底層保證了資料的原子性,它的好處是

  • 不必做同步阻塞的掛起以及喚醒執行緒這樣大量的開銷
  • 將保證資料原子性的這個操作交給了底層硬體效能遠遠高於做同步阻塞掛起、喚醒等操作,所以它的併發性更好
  • 可以根據 CAS 返回的狀態決定後續操作來達到資料的一致性,比如 increment 失敗那就一值迴圈直到成功為止(下文會講)等等

首先來看一個錯誤的 increment()

    private int value = 0;

    public static void main(String[] args) {
        Test test = new Test();
        test.increment();
        System.out.println("期待值:"
+ 100 * 100 + ",最終結果值:" + test.value); } private void increment() { for (int i = 0; i < 100; i++) { new Thread(() -> { for (int j = 0; j < 100; j++) { value++; } }).start(); } } 複製程式碼

輸出:期待值:10000,最終結果值:9900

可以發現輸出的結果值錯誤,這是因為 value++ 不是一個原子操作,它將 value++ 拆分成了 3 個步驟 load、add、store,多執行緒併發有可能上一個執行緒 add 過後還沒有 store 下一個執行緒又執行了 load 了這種重複造成得到的結果可能比最終值要小。

當然在這裡加 volatile int value 也是沒有用的因為 32 位的 int 操作本身就是原子的,而且 volatile 也沒有辦法讓這 3 個操作原子性執行,它只能禁止某個指令重排序來保證其對應的記憶體可見,如果是 long 等 64 位操作型別的可以加上 volatile,因為在 32 位的機器上寫操作可能會被分配到不同的匯流排事務上去操作(可以想象成分成了 2 步操作,第一步操作前 32 位後一步操作後 32 位),而匯流排事務的執行是由匯流排仲裁決定的不能保證它的執行順序(相當於前者加了 32 位可能就切換到其它的地方執行了,比如直接就讀取了,那麼資料的讀取就只讀取到了寫入一半的值)

使用 CAS 來保證 increment() 正確

我們知道關於 CAS 的操作基本上都封裝在 Unsafe 這個包裡面,但是由於 Unsafe 不允許我們外部使用,它認為這是一個不安全的操作,比如如果直接使用 Unsafe unsafe = Unsafe.getUnsafe(); 就會丟擲 Exception in thread "main" java.lang.SecurityException: Unsafe

我們檢視下原始碼,原來是因為它做了校驗

    public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();
        if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
            throw new SecurityException("Unsafe");
        } else {
            return theUnsafe;
        }
    }
複製程式碼

所以我們可以通過反射來呼叫它(當然實際操作中不建議這麼使用,此處為了演示方便)

public class Test {

    // value 的記憶體地址,便於直接找到 value
    private static long valueOffset = 0;

    {
        try {
            // 這個記憶體地址是和 value 這個成員變數的值繫結在一起的
            valueOffset = getUnsafe().objectFieldOffset
                (Test.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    
    private int value;

    public static void main(String[] args) throws NoSuchFieldException,IllegalAccessException {
        Test test = new Test();
        test.increment();
    }

    private void increment() throws NoSuchFieldException,IllegalAccessException {
        Unsafe unsafe = getUnsafe();
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    unsafe.getAndAddInt(this,valueOffset,1);
                }
            }).start();
        }
        System.out.println("需要得到的結果為: " + 100 * 1000);
        System.out.println("實際得到的結果為: " + value);
    }

    // 反射獲取 Unsafe
    private Unsafe getUnsafe() throws NoSuchFieldException,IllegalAccessException {
        Field field = Unsafe.class.getDeclaredField("theUnsafe");
        field.setAccessible(true);
        return (Unsafe) field.get(null);
    }
}
複製程式碼

這下我們就能從輸出中看到結果是正確的了

CAS 底層的實現原理

我們繼續探討, getAndAddInt 呼叫了 unsafe.compareAndSwapInt(Object obj,long valueOffset,int expect,int update) 這個方法在 Hotspot 到底是如何實現的,我們發現呼叫的是 native 的 unsafe.compareAndSwapInt(Object obj,int update),我們翻看 Hotspot 原始碼發現在 unsafe.cpp 中定義了這樣一段程式碼

UNSAFE_ENTRY(jboolean,Unsafe_CompareAndSwapInt(JNIEnv *env,jobject unsafe,jobject obj,jlong offset,jint e,jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p,offset);
  return (jint)(Atomic::cmpxchg(x,addr,e)) == e;
UNSAFE_END
複製程式碼

從中我們可以看到它是使用了 Atomic::cmpxchg(x,e) 這個操作來完成的,在不同的底層硬體會有不一樣的程式碼 Hotspot 向上幫我們遮蔽了細節。這個實現方法在 solaris,windows,linux_x86 等都有不一樣的實現方法,我們用我們最常見的伺服器 linux_x86 來說,它的實現程式碼如下

inline jint Atomic::cmpxchg (jint exchange_value,volatile jint*  dest,jint compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value),"a" (compare_value),"r" (dest),"r" (mp)
                    : "cc","memory");
  return exchange_value;
}
複製程式碼

從以上程式碼可以看出幾點

  • Hotspot 直接呼叫底層彙編來實現對應的功能
  • __asm__ 表示的是後續是一段彙編程式碼
  • volatile 此處的 volatile 和 Java 中的有些區別,這裡使用用以告訴編譯器不再對這段程式碼進行彙編優化
  • LOCK_IF_MP 表示的是如果作業系統是多核的那麼就需要加鎖來保證其原子性
  • cmpxchgl 就是彙編中的比較並且交換

從這裡就能看出來,CAS 底層也是在用鎖來保證其原子性的。在 Intel 早期的實現中是直接將匯流排鎖住,這樣導致其它沒有獲得匯流排事務訪問權的處理器無法執行後續的操作,效能會極大的降低。

後續 Intel 對其進行了優化升級,在 x86 處理器中可以只需要鎖定 特定的記憶體地址,那麼其它處理器也就可以繼續使用匯流排來訪問記憶體資料了,只不過是如果其它匯流排也要訪問被鎖住的記憶體地址資料時會阻塞而已,這樣來大幅度的提升了效能。

但是思考一下以下幾點問題的

  1. 併發量非常高,可能導致都在不停的爭搶該值,可能導致很多執行緒一致處於迴圈狀態而無法更新資料,從而導致 CPU 資源的消耗過高
  2. ABA 問題,比如說上一個執行緒增加了某個值,又改變了某個值,然後後面的執行緒以為資料沒有發生過變化,其實已經被改動了

JAVA8 對於 CAS 的優化

當然 ABA 的問題可以使用增加版本號來控制,每次操作版本號 + 1,版本號變更了說明值就被改過一次了,在 Java 中 AtomicStampedReference 這個類提供了這種問題的解決方案。

而對於說第一個問題來說在 Java8 中也有了對應的優化,Java 8 中提供了一些新的工具類用以解決這種問題,如下

我們挑一個來看,其它都是類似的

可以看到他是可以序列化的,並且必須是 Number 型別的,繼承 Striped64 能夠支援動態的分段

它的原理主要採用CAS分段機制與自動分段遷移機制,最開始是在 base 上面進行 CAS 操作,後續併發執行緒過多,那麼就將這大量的執行緒分配到 cells 陣列中去,每個陣列的執行緒單獨去執行累加操作,最終再合併結果

圖來自【基礎鞏固篇】Java 8中對CAS的優化

總結

可以看到跟做直接做同步掛起或者喚醒執行緒相比如果能夠合理的使用 CAS 進行操作的話或者是將其二者合併使用,那麼在併發效能上能夠提升一個量級

  • 對於像 ReentrantLock 之類的都是使用的將同步阻塞 + CAS 這種方式來實現高效能的鎖,比如 ReentrantLock 中 tryAcuqire() 如果使用 CAS 未能獲取到對應的鎖,那麼就將其放入阻塞佇列,等待後續的喚醒
  • 比如自旋鎖在指定的次數通過 CAS 都未能獲取到鎖的話就掛起進入阻塞佇列等待被喚醒
  • 比如使用 AtomicInteger 進行自增的時候就會一值不停的輪詢判斷更新,直到操作成功為止
  • 使用輪詢 CAS 處理而不嵌入阻塞掛起和喚醒的話,它的優勢就是在於能夠快速響應使用者請求減少資源消耗,因為執行緒的掛起和喚醒涉及到使用者態核心態的呼叫又涉及到執行緒“快照”資料的相關儲存,對於響應和資源消耗是又慢又高,不過我們也需要考慮在 CPU 輪詢上的開銷,所以可以將二者一定程度上的融合在一起使用。
  • 所以理解 CAS 還是非常重要的

參考: JAVA 中的 CAS