Java併發程式設計之CAS
在Java併發程式設計的世界裡,synchronized 和 Lock 是控制多執行緒併發環境下對共享資源同步訪問的兩大手段。其中 Lock 是 JDK 層面的鎖機制,是輕量級鎖,底層使用大量的自旋+CAS操作實現的。
學習併發推薦《Java併發程式設計的藝術》
那什麼是CAS呢?CAS,compare and swap,即比較並交換,什麼是比較並交換呢?在Lock鎖的理念中,採用的是一種樂觀鎖的形式,即多執行緒去修改共享資源時,不是在修改之前就加鎖,而是樂觀的認為沒有別的執行緒和自己爭鎖,就是通過CAS的理念去保障共享資源的安全性的。CAS的基本思想是,拿變數的原值和記憶體中的值進行比較,如果相同,則原值沒有被修改過,那麼就將原值修改為新值,這兩步是原子的,能夠保證同一時間只有一個執行緒修改成功。這就是CAS的理念。
Java中要想使用CAS原子的修改某值,怎麼做呢?幸運的是Java提供了這樣的API,就是在sun.misc.Unsafe.java類中。Unsafe,中文名不安全的,也被稱為魔術類,魔法類。
Unsafe類介紹
Unsafe類使Java擁有了像C語言的指標一樣操作記憶體空間的能力,一旦能夠直接操作記憶體,這也就意味著
(1)不受JVM管理,意思就是使用Unsafe操作記憶體無法被JVM GC,需要我們手動GC,稍有不慎就會出現記憶體洩漏。
(2)Unsafe的不少方法中必須提供原始地址(記憶體地址)和被替換物件的地址,並且偏移量要自己計算(其提供的有計算偏移量的方法),所以一旦出現問題就是JVM崩潰級別的異常,會導致整個JVM例項崩潰,表現為應用程式直接crash掉。
因此,從上面三個角度來看,雖然在一定程度上提升了效率但是也帶來了指標的不安全性。這也是它被取名為Unsafe的原因吧。
下面我們深入到原始碼中看看,提供了什麼方法直接操作記憶體。
開啟Unsafe這個類,我們會發現裡面有大量的被native關鍵字修飾的方法,這意味著這些方法是C語言提供的實現,底層調的是C語言的庫函式,我們無法直接看到他的原始碼實現,需要去從OpenJDK去看了。另外還有一些基於native方法封裝的其他方法,整個Unsafe中的方法大致可以歸結為以下幾類:
(1)初始化操作
(2)操作物件屬性
(3)運算元組元素
(5)CAS機制
CAS的使用
如果你學過java併發程式設計的話,稍微閱讀過JUC併發包裡面的原始碼的話,對這個Unsafe類一定不陌生,因為整個java併發包底層實現的核心就是靠它。JUC併發包中主要使用它提供的CAS(compare and swap,比較並交換)操作,原子的修改鎖的狀態和一些佇列元素。
沒看過JUC原始碼的讀者也不用擔心,今天我們就是簡單介紹Unsafe類中的CAS操作,那麼我們接下來就會通過一個簡單的例子來看看Unsafe的CAS是怎麼使用的。
首先,使用這個類我們第一個要做的事情就是拿到這個類的例項,下面我們自定義了一個Util類用來獲取Unsafe的例項
import sun.misc.Unsafe; import java.lang.reflect.Field; public class UnsafeUtil { public static Unsafe reflectGetUnsafe() { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); return (Unsafe) field.get(null); } catch (Exception e) { e.printStackTrace(); } return null; } }
這個工具類通過反射的方式拿到Unsafe類中的一個名為theUnsafe欄位,該欄位是Unsafe型別,並在static塊中new一個Unsafe物件初始化這個欄位(單例模式)。
然後我們定義了一個AtomicState類,這個類很簡單,有一個int型的state欄位,還有一個Unsafe的常量,以及int型的offsetState,用來記錄state欄位在AtomicState物件中的偏移量。具體程式碼如下:
import com.walking.juc.util.UnsafeUtil; import sun.misc.Unsafe; public class AtomicState { private volatile int state = 0; public int getState() { return state; } private static final Unsafe UNSAFE = UnsafeUtil.reflectGetUnsafe(); private static final long offsetState; static { try { offsetState = UNSAFE.objectFieldOffset(AtomicState.class.getDeclaredField("state")); } catch (NoSuchFieldException e) { throw new Error(e); } } public final boolean compareAndSetState(int oldVal, int newVal) { return UNSAFE.compareAndSwapInt(this, offsetState, oldVal, newVal); } }
我們定義了一個compareAndSetState
方法,需要傳兩個引數,分別是state的舊值和新值,也就是讀到的state的之前的值,以及想要把它修改成什麼值,該方法內部呼叫的是Unsafe類的compareAndSwapInt
方法,它有四個引數,分別是要修改的類例項物件、要修改的值的偏移量、舊值、新值。解釋一下偏移量,剛才我們提到Unsafe提供給我們直接訪問記憶體的能力,那麼訪問記憶體肯定是要知道記憶體的地址在哪才能去修改其相應的值吧,我們看,第一個引數是物件例項引用,也就是說,已經知道這個物件的地址了,那麼我們想修改這個物件裡的state的值,就只需要計算出state在這個物件的偏移量就能找到state所在的記憶體地址,那就可以修改它了。
然後,我們通過一個測試類來驗證Unsafe的CAS操作。這個測試類我來解釋下大致的思想,我們弄5個執行緒,讓這個5個執行緒一個個啟動,我們無法保證執行緒同時開始啟動,那麼我們有辦法保證這個5個執行緒同時執行我們的程式碼,就是使用JUC包裡的CyclicBarrier
工具來實現的,這個工具初始化時需要傳入一個int值n,我們線上程的run方法內部在業務程式碼執行之前呼叫CyclicBarrie
r的await方法,當指定數量n的執行緒都呼叫了這個方法那麼這n個執行緒將同時往下執行,就像設定了一個屏障,所有人都達到這個屏障後,一起通過屏障,依次來模擬多執行緒併發
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @Slf4j public class TestAtomicState { static int tNum = 5;//執行緒數 我們開10個執行緒模擬多執行緒併發 static CyclicBarrier cyclicBarrier = new CyclicBarrier(tNum);//柵欄 static CountDownLatch countDownLatch = new CountDownLatch(tNum);//計數器 static AtomicState atomicState = new AtomicState(); public static void main(String[] args) throws InterruptedException { for (int i = 1; i <= tNum; i++) { new Thread(new MyTask(),"t-"+i).start(); } countDownLatch.await();//為的是讓主執行緒在這句阻塞住,等待所有執行緒執行完畢(計數器減到0)再往下走 log.info("state最後的值:" + atomicState.getState()); } static class MyTask implements Runnable{ @Override public void run() { try { log.info(Thread.currentThread().getName() + "到達起跑線"); String name = Thread.currentThread().getName(); String substring = name.substring(name.indexOf("-") + 1); int i1 = Integer.parseInt(substring); cyclicBarrier.await();//設定一個屏障,所有執行緒達到這後開始一起往下執行 模擬併發 boolean b = atomicState.compareAndSetState(0, i1); if (b) { log.info("修改成功,tName:{}" ,Thread.currentThread().getName()); } else { log.info("修改失敗,tName:{}" ,Thread.currentThread().getName()); } } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } finally { countDownLatch.countDown();//執行緒執行完畢計數器減一 } } } }
在cyclicBarrier.await();
之後我們呼叫AtomicState
的compareAndSetState
方法傳入舊值0和新值,新值就是執行緒名t-n中的n,哪個執行緒修改成功,最後state值就是執行緒名中的數字。
至於CountDownLatch
使用它的目的是讓mian執行緒等到t-1到t-5的執行緒全部執行完後列印state的值。我們的重點不是CyclicBarrier
和CountDownLatch
,知道它們是幹什麼的就行。
然後我們執行這個測試程式:
13:57:46.619 [t-2] INFO com.walking.castest.TestAtomicState - t-2到達起跑線 13:57:46.619 [t-3] INFO com.walking.castest.TestAtomicState - t-3到達起跑線 13:57:46.619 [t-5] INFO com.walking.castest.TestAtomicState - t-5到達起跑線 13:57:46.619 [t-1] INFO com.walking.castest.TestAtomicState - t-1到達起跑線 13:57:46.619 [t-4] INFO com.walking.castest.TestAtomicState - t-4到達起跑線 13:57:46.628 [t-1] INFO com.walking.castest.TestAtomicState - 修改失敗,tName:t-1 13:57:46.628 [t-4] INFO com.walking.castest.TestAtomicState - 修改成功,tName:t-4 13:57:46.628 [t-2] INFO com.walking.castest.TestAtomicState - 修改失敗,tName:t-2 13:57:46.628 [t-5] INFO com.walking.castest.TestAtomicState - 修改失敗,tName:t-5 13:57:46.628 [t-3] INFO com.walking.castest.TestAtomicState - 修改失敗,tName:t-3 13:57:46.636 [main] INFO com.walking.castest.TestAtomicState - state最後的值:4
可以看到只有一個執行緒執行成功,這就是CAS的基本使用。
CAS的ABA問題
何為ABA問題呢?舉個例子,小明和小花合夥賣煎餅,不就後攢了10萬元,他們一起去銀行把錢存在他們公共的賬戶裡,但是小明聽說最近牛市來了,就偷偷的把錢轉移到了股票市場,公共賬戶餘額是0。1個月後股票賺了一筆錢,然後小明把之前轉移的10萬元又存到他們的公共賬戶。小明和小花一個月後又去存錢,去查賬戶餘額是10萬。這就是ABA問題,簡單來說就是一個值本來是A,兩個執行緒同時都看到是A,然後執行緒1把A改成B後又改成A,執行緒1結束了。然後執行緒2去修改時,看到的是A,無法感知到這個過程中值發生過變化,對於執行緒2來說就發生了ABA的問題。
模擬ABA問題:
import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class CAS_ABA_Stampe { static AtomicInteger atomicInteger = new AtomicInteger(10); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { try { log.info("{}拿到state的值為:{}", Thread.currentThread().getName(), atomicInteger.get()); log.info("{}第一次修改", Thread.currentThread().getName()); atomicInteger.getAndSet(0); Thread.sleep(2000); log.info("{}第二次修改", Thread.currentThread().getName()); atomicInteger.getAndSet(10); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1"); t1.start(); Thread t2 = new Thread(() -> { try { log.info("{}第一次拿到state的值為:{}", Thread.currentThread().getName(), atomicInteger.get()); Thread.sleep(2500); log.info("{}第二次拿到state的值為:{}", Thread.currentThread().getName(), atomicInteger.get()); log.info("{}開始修改state的值為2", Thread.currentThread().getName()); atomicInteger.getAndSet(20); log.info("{}修改成功", Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2"); t2.start(); t1.join(); t2.join(); log.info("最終state的值:{}", atomicInteger.get()); } }
//結果t2也能修改成功,並沒有發現這種變化 15:12:35.999 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1拿到state的值為:10 15:12:35.999 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2第一次拿到state的值為:10 15:12:36.014 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1第一次修改 15:12:38.015 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1第二次修改 15:12:38.515 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2第二次拿到state的值為:10 15:12:38.515 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2開始修改state的值為2 15:12:38.516 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2修改成功 15:12:38.516 [main] INFO com.walking.castest.CAS_ABA_Stampe - 最終state的值:20
怎麼解決CAS的ABA問題呢?
那就是基於版本號去解決,增加一個版本號的概念,每次被修改這個版本號就加1,版本號是一直向前的,版本號變了,就說明被修改過。
JUC包中提供瞭解決ABA問題的工具:
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicStampedReference; @Slf4j public class CAS_ABA_Stampe { static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(10, 1); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { try { int stamp = stampedReference.getStamp(); int intValue = stampedReference.getReference().intValue(); log.info("{}私挪公款拿到stamp的值為:{},餘額:{}", Thread.currentThread().getName(), stamp,intValue); stampedReference.compareAndSet(10, 0, stamp, stamp + 1); Thread.sleep(2000); stamp = stampedReference.getStamp(); intValue = stampedReference.getReference().intValue(); log.info("{}還回公款拿到stamp的值為:{},餘額:{}", Thread.currentThread().getName(), stamp,intValue); stampedReference.compareAndSet(0, 10, stamp, stamp + 1); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1"); t1.start(); Thread t2 = new Thread(() -> { try { int stamp = stampedReference.getStamp(); int intValue = stampedReference.getReference().intValue(); log.info("{}拿到stamp的值為:{},餘額:{}", Thread.currentThread().getName(), stamp, intValue); Thread.sleep(3000); log.info("{}開始存款", Thread.currentThread().getName()); if (stampedReference.compareAndSet(10, 20, stamp, stamp + 1)) { log.info("{}款款成功", Thread.currentThread().getName()); }else { log.info("{}存款失敗,發現賬戶異常!!oldStamp:{},currentStamp:{}", Thread.currentThread().getName(),stamp,stampedReference.getStamp()); } } catch (InterruptedException e) { e.printStackTrace(); } }, "t2"); t2.start(); t1.join(); t2.join(); log.info("最終賬戶餘額:{}W", stampedReference.getReference().intValue()); } }
執行結果:
15:32:37.488 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1私挪公款拿到stamp的值為:1,餘額:10 15:32:37.476 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2拿到stamp的值為:1,餘額:10 15:32:39.500 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1還回公款拿到stamp的值為:2,餘額:0 15:32:40.498 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2開始存款 15:32:40.498 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2存款失敗,發現賬戶異常!!oldStamp:1,currentStamp:3 15:32:40.498 [main] INFO com.walking.castest.CAS_ABA_Stampe - 最終賬戶餘額:10W
t2存款時就發現賬戶異常,因為版本號已經變成了3,和t2剛開始拿到的不一樣,說明已經被別人修改過,從而解決ABA問題。
到這裡CAS就完啦。別忘了點贊,轉發。
往期熱文:
歡迎關注公眾號,謝謝支援。