一篇文章快速搞懂 Atomic(原子整數/CAS/ABA/原子引用/原子陣列/LongAdder)
前言
相信大部分開發人員,或多或少都看過或寫過併發程式設計的程式碼。併發關鍵字除了Synchronized,還有另一大分支Atomic。如果大家沒聽過沒用過先看基礎篇
,如果聽過用過,請滑至底部看進階篇
,深入原始碼分析。
提出問題:int執行緒安全嗎?
看過Synchronized相關文章的小夥伴應該知道其是不安全的,再次用程式碼應驗下其不安全性:
public class testInt { static int number = 0; public static void main(String[] args) throws Exception { Runnable runnable = new Runnable() { @Override public void run() { for (int i = 0; i < 100000; i++) { number = number+1; } } }; Thread t1 = new Thread(runnable); t1.start(); Thread t2 = new Thread(runnable); t2.start(); t1.join(); t2.join(); System.out.println("number:" + number); } }
執行結果:
在上面的例子中,我們定義一個初始值為0的靜態變數number,再新建並執行兩個執行緒讓其各執行10萬次的自增操作,如果他是執行緒安全的,應該兩個執行緒執行後結果為20萬,但是我們發現最終的結果是小於20萬的,即說明他是不安全的。
在之前Synchronized那篇文章中說過,可以在number=number+1這句程式碼上下加Synchronized關鍵字實現執行緒安全。但是其對資源的開銷較大,所以我們今天再看下另外一種實現執行緒安全的方法Atomic。
Atomic基礎篇分界線
原子整數(基礎型別)
整體介紹
Atomic是jdk提供的一系列包的總稱,這個大家族包括原子整數(AtomicInteger,AtomicLong,AtomicBoolean),原子引用(AtomicReference,AtomicStampedReference,AtomicMarkableReference),原子陣列(AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray),更新器(AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater)。
AtomicInteger
AtomicInteger,AtomicBoolean,AtomicLong三者功能類似,咱就以AtomicInteger為主分析原子類。
先看下有哪些API,及其他們具體啥功能:
public class testInt { public static void main(String[] args) { //定義AtomicInteger型別的變數,值為1 AtomicInteger i = new AtomicInteger(1); //incrementAndGet方法先新增1再返回,所以列印2,此時i為2 System.out.println(i.incrementAndGet()); //getAndIncrement方法先返回值再新增1,所以列印2,此時i為3 System.out.println(i.getAndIncrement()); //get方法返回當前i值,所以列印3,此時i為3 System.out.println(i.get()); //引數為正數即新增,getAndAdd方法先返回值再新增666,所以列印3,此時i為669 System.out.println(i.getAndAdd(666)); //引數為負數即減去,getAndAdd方法先返回值再減去1,所以列印669,此時i為668 System.out.println(i.getAndAdd(-1)); //引數為正數即新增,addAndGet方法先新增666再返回值,所以列印1334,此時i為1334 System.out.println(i.addAndGet(666)); //引數為負數即減去,addAndGet方法先減去-1再返回值,所以列印1333,此時i為1333 System.out.println(i.addAndGet(-1)); //getAndUpdate方法IntUnaryOperator引數是一個箭頭函式,後面可以寫任何操作,所以列印1333,此時i為13331 System.out.println(i.getAndUpdate(x -> (x * 10 + 1))); //最終列印i為13331 System.out.println(i.get()); } }
執行結果:
對上述int型別的例子改進
public class testInt { //1.定義初始值為0的AtomicInteger型別變數number static AtomicInteger number = new AtomicInteger(0); public static void main(String[] args) throws Exception { Runnable runnable = new Runnable() { @Override public void run() { for (int i = 0; i < 100000; i++) { //2.呼叫incrementAndGet方法,實現加1操作 number.incrementAndGet(); } } }; Thread t1 = new Thread(runnable); t1.start(); Thread t2 = new Thread(runnable); t2.start(); t1.join(); t2.join(); System.out.println("number:" + number.get()); } }
我們可以看到執行結果是正確的20萬,說明AtomicInteger的確保證了執行緒安全性,即在多執行緒的過程中,執行結果還是正確的。但是這存在一個ABA問題,下面將原子引用的時候再說,先立個flag。
原始碼分析
我們以incrementAndGet方法為例,看下底層是如何實現的,AtomicInteger類中的incrementAndGet方法呼叫了Unsafe類的getAndAddInt方法。
public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; }
我們看下getAndAddInt方法,裡面有個迴圈,直接值為compareAndSwapInt返回值為true,才結束迴圈。這裡就不得不提CAS,這就是多執行緒安全性問題的解決方法。
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
CAS
執行緒1和執行緒2同事獲取了主記憶體變數值0,執行緒1加1並寫入主記憶體,現在主記憶體變數值1,執行緒2也加2並嘗試寫入主記憶體,這個時候是不能寫入主記憶體的,因為會覆蓋掉執行緒1的操作,具體過程如下圖。
CAS是線上程2嘗試寫入記憶體的時候,通過比較並設定(CompareAndSet)發現現在主記憶體當前值為1,和他剛開始讀取的值0不一樣,所以他會放棄本次修改,重新讀取主記憶體的最新值,然後再重試下執行緒2的具體邏輯操作,再次嘗試寫入主記憶體。如果這時候執行緒1,再次對主記憶體進行了修改,執行緒2發現現在主記憶體的值又和預期不一樣,所以將放棄本次修改,再次讀取主記憶體最新值,再次重試並嘗試寫入主記憶體。我們可以發現這是一個重複比較的過程,即直到和預期初始值一樣,才會寫入主記憶體,否則將一直讀取重試的迴圈。這就是上面for迴圈的意義。
CAS的實現實際上利用了CPU指令來實現的,如果作業系統不支援CAS,還是會加鎖的,如果作業系統支援CAS,則使用原子性的CPU指令。
原子引用
在日常使用中,我們不止對上述基本型別進行原子操作,而是需要對一些複雜型別進行原子操作,所以需要AtomicReference。
不安全實現
先看不安全的BigDecimal型別:
public class testReference { static BigDecimal number = BigDecimal.ZERO; public static void main(String[] args) throws Exception { Runnable runnable = new Runnable() { @Override public void run() { for (int i = 0; i < 1000; i++) { number=number.add(BigDecimal.ONE); } } }; Thread t1 = new Thread(runnable); t1.start(); Thread t2 = new Thread(runnable); t2.start(); t1.join(); t2.join(); System.out.println(number); } }
執行結果如下圖,我們可以看到兩個執行緒,自迴圈1000次加1操作,最終結果應該是2000,可是結果小於2000。
安全實現-使用CAS
public class testReference { //定義AtomicReference型別BigDecimal變數 static AtomicReference<BigDecimal> number = new AtomicReference<BigDecimal>(BigDecimal.ZERO); public static void main(String[] args) throws Exception { Runnable runnable = new Runnable() { @Override public void run() { for (int i = 0; i < 1000; i++) { //手動寫迴圈+CAS判斷 while(true){ BigDecimal pre=number.get(); BigDecimal next=number.get().add(BigDecimal.ONE); if(number.compareAndSet(pre,next)) { break; } } } } }; Thread t1 = new Thread(runnable); t1.start(); Thread t2 = new Thread(runnable); t2.start(); t1.join(); t2.join(); System.out.println(number.get()); } }
執行結果如下:
ABA問題及解決
在上面CAS過程中,是通過值比較來知曉是不是能夠更新成功,那如果執行緒1先加1再減1,這樣主記憶體還是原來的值,即執行緒2還是可以更新成功的。但是這樣邏輯錯了
,執行緒1已經發生了修改,執行緒2不能直接更新成功。
程式碼:
public class testInt { static AtomicInteger number = new AtomicInteger(0); public static void main(String[] args) throws Exception { Thread t1 = new Thread(new Runnable() { @Override public void run() { int a = number.get(); System.out.println("開始number:" + a); try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(number.compareAndSet(a, a++)); } }); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { System.out.println("開始增加操作"); int a = number.incrementAndGet(); System.out.println("當前number:" + a); int b = number.decrementAndGet(); System.out.println("當前number:" + b); } }); t2.start(); t1.join(); t2.join(); } }
我們看執行緒2對其進行了一系列操作,但是最後列印了還是true,表示可以更新成功的。這顯然不對。
那我們可以使用AtomicStampedReference,為其新增一個版本號。執行緒1在剛開始讀取主記憶體的時候,獲取到值為0,版本為1,執行緒2也獲取到這兩個值,執行緒1進行加1,減1的操作的時候,版本各加1,現在主記憶體的值為0,版本為2,而執行緒2還拿著預計值為0,版本為1的資料嘗試寫入主記憶體,這個時候因版本不同而更新失敗。具體我們用程式碼試下:
public class testInt { static AtomicStampedReference<Integer> number = new AtomicStampedReference<Integer>(0, 0); public static void main(String[] args) throws Exception { Thread t1 = new Thread(new Runnable() { @Override public void run() { int a = number.getReference(); int s = number.getStamp(); System.out.println("開始number:" + a + ",stamp:" + s); try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(number.compareAndSet(a, a + 1, s, s + 1)); } }); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { System.out.println("開始增加操作"); int a = number.getReference(); int s = number.getStamp(); number.compareAndSet(a, a + 1, s, s + 1); System.out.println("當前number:" + a + ",stamp:" + (s + 1)); a = number.getReference(); s = number.getStamp(); number.compareAndSet(a, a - 1, s, s + 1); System.out.println("當前number:" + a + ",stamp:" + (s+1)); } }); t2.start(); t1.join(); t2.join(); } }
我們可以看到每次操作都會更新stamp(版本號),在最後對比的時候不僅比較值,還比較版本號,所以是不能更新成功的,false.
原子陣列
AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray三者類似,所以以AtomicIntegerArray為例,我們可以將下面AtomicIntegerArray看做是AtomicInteger型別的陣列,其底層很類似,就不詳細寫了。
AtomicIntegerArray array = new AtomicIntegerArray(10); array.getAndIncrement(0); // 將第0個元素原子地增加1 AtomicInteger[] array = new AtomicInteger[10]; array[0].getAndIncrement(); // 將第0個元素原子地增加1
欄位更新器和原子累加器比較簡單,這裡就不說了。
Atomic進階篇分界線
LongAdder原始碼分析
LongAdder使用
LongAdder是jdk1.8之後新加的,那為什麼要加他?這個問題,下面將回答,我們先看下如何使用。
public class testLongAdder { public static void main(String[] args) throws Exception { LongAdder number = new LongAdder(); Runnable runnable = new Runnable() { @Override public void run() { for (int j = 0; j < 10000; j++) { number.add(1L); } } }; Thread t1 = new Thread(runnable); Thread t2 = new Thread(runnable); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("number:" + number); } }
我們可以看到LongAdder的使用和AtomicLong大致相同,使用兩個執行緒Thread1,Thread2對number值各進行一萬次的自增操作,最後的number是正確的兩萬。
與Atomic的對比優勢
那問題來了,既然AtomicLong能夠完成對多執行緒下的number進行執行緒安全的操作,那為什麼還要LongAdder?我們先來段程式碼比較下,兩個在結果都是正確的前提下,效能方面的差距。
public class testLongAdder { public static void main(String[] args) { //1個執行緒,進行100萬次自增操作 test1(1,1000000); //10個執行緒,進行100萬次自增操作 test1(10,1000000); //100個執行緒,進行100萬次自增操作 test1(100,1000000); } static void test1(int threadCount,int times){ long startTime=System.currentTimeMillis(); AtomicLong number1=new AtomicLong(); List<Thread> threads1=new ArrayList<>(); for(int i=0;i<threadCount;i++) { threads1.add(new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < times; j++) { number1.incrementAndGet(); } } })); } threads1.forEach(thread -> thread.start()); threads1.forEach(thread ->{ try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } ); long endTime=System.currentTimeMillis(); System.out.println("AtomicLong:"+number1+",time:"+(endTime-startTime)); LongAdder number2=new LongAdder(); List<Thread> threads2=new ArrayList<>(); for(int i=0;i<threadCount;i++) { threads2.add(new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < times; j++) { number2.add(1); } } })); } threads2.forEach(thread -> thread.start()); threads2.forEach(thread ->{ try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } ); System.out.println("LongAdder:"+number2+",time:"+(System.currentTimeMillis()-endTime)); } }
上述程式碼對比了1個執行緒,10個執行緒,100個執行緒在進行100百次自增操作後,AtomicLong和LongAdder所花費的時間。通過列印語句,我們發現在最終number1和number2都正確的基礎上,LongAdder花費的時間比AtomicLong少了一個量級。
原始碼分析
那為什麼會導致這種情況,我們就要從原始碼層面分析。AtomicLong為什麼效率低?因為如果執行緒數量一多,尤其在高併發的情況下,比如有100個執行緒同時想要對物件進行操作,肯定只有一個執行緒會獲取到鎖,其他99個執行緒可能空轉,一直迴圈知道執行緒釋放鎖。如果該執行緒操作完畢釋放了鎖,其他99個執行緒再次競爭,也只有一個執行緒獲取鎖,另外98個執行緒還是空轉,直到鎖被釋放。這樣CAS操作會浪費大量資源在空轉上,從而使得AtomicLong線上程數越來越多的情況下越來越慢。
AtomicLong是多個執行緒對同一個value值進行操作,導致多個執行緒自旋次數太多,效能降低。而LongAdder在無競爭的情況,跟AtomicLong一樣,對同一個base進行操作,當出現競爭關係時則是採用化整為零的做法,從空間換時間,用一個數組cells,將一個value拆分進這個陣列cells。多個執行緒需要同時對value進行操作時候,可以對執行緒id進行hash得到hash值,再根據hash值對映到這個陣列cells的某個下標,再對該下標所對應的值進行自增操作。當所有執行緒操作完畢,將陣列cells的所有值和無競爭值base都加起來作為最終結果。
我們先看下LongAdder裡面的欄位,發現其裡面沒有,主要是在其繼承的Stripped64類中,有下面四個主要變數。
/** CPU數量,即cells陣列的最大長度*/ static final int NCPU = Runtime.getRuntime().availableProcessors(); /** *cells陣列,為2的冪,2,4,8,16.....,方便以後位運算 */ transient volatile Cell[] cells; /** * 基值,主要用於沒有競爭的情況,通過CAS更新。 */ transient volatile long base; /** * 調整單元格大小(擴容),建立單元格時使用的鎖。 */ transient volatile int cellsBusy;
下面是add方法開始。
public void add(long x) { //as:cells陣列的引用 //b:base的基礎值 //v:期望值 //m:cells陣列大小 //a:當前陣列命中的單元 Cell[] as; long b, v; int m; Cell a; //as不為空(cells已經初始化過,說明之前有其他執行緒對初始化)或者CAS操作不成功(執行緒間出現競爭) if ((as = cells) != null || !casBase(b = base, b + x)) { //初始化uncontented,true表示未競爭(因為有兩個情況,這裡先初始化,後面對其修改,就能區分這兩種情況) boolean uncontended = true; //as等於null(cells未初始化) //或者執行緒id雜湊出來的下標所對應的值為空(cell等於空),getProbe() & m功能是獲取下標,底層邏輯是位運算 //或者更新失敗為false,即發生競爭,取非就為ture if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) //進入到if裡面,說明更新case失敗,或者更新某個cell也失敗了,或者cell為空,或者cells為空 longAccumulate(x, null, uncontended); } }
從LongAdder呼叫Stripped64的longAccumulate方法,主要是初始化cells
,cells的擴容
,多個執行緒同時命中一個cell的競爭
操作。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { //x:add的值,fn:為null,wasUncontended:是否發生競爭,true為發生競爭,false為不發生競爭 int h;//執行緒的hash值 //如果該執行緒為0,即第一次進來,所以ThreadLocalRandom強制初始化執行緒id,再對其hash if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true; } //擴容意向,為false肯定不擴容,為true可能擴容 boolean collide = false; //死迴圈 for (;;) { //as:cells陣列的引用 //a:當前執行緒命中的cell //n:cells的長度 //v:當前執行緒命中的cell所擁有的value值 Cell[] as; Cell a; int n; long v; //cells不為空 if ((as = cells) != null && (n = as.length) > 0) { //當前執行緒命中的cell為空,下面邏輯是新增cell if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } //發生競爭 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //沒有競爭,嘗試修改當前執行緒對應的cell值,成功跳出迴圈 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //如果n大於CPU最大數量,不可擴容 else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; //獲取到了鎖,進行擴容,為2的冪, else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1];//左移一位運算子,數量加倍 for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } //cells等於空,並且獲取到鎖,開始初始化工作,建立結束釋放鎖,繼續迴圈 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
結語
結束了,撒花。這篇主要說了Atomic的一些使用,包括Atomic原子類(AtomicInteger,AtomicLong,AtomicBoolean),Atomic原子引用(AtomicReference,AtomicStampedReference),以及1.8之後LongAdder的優勢,原始碼分析。過程還穿插了一些CAS,ABA問題引入和解決方式。
參考資料
Java多執行緒進階(十七)—— J.U.C之atomic框架:LongAdder
CAS原理
Java 8 Performance Improvements: LongAdder vs AtomicLong
AtomicInteger深入理解
原子操作類AtomicInteger詳解
玩轉Java併發工具,精通JUC,成為併發多