1. 程式人生 > >一篇文章快速搞懂 Atomic(原子整數/CAS/ABA/原子引用/原子陣列/LongAdder)

一篇文章快速搞懂 Atomic(原子整數/CAS/ABA/原子引用/原子陣列/LongAdder)

前言

相信大部分開發人員,或多或少都看過或寫過併發程式設計的程式碼。併發關鍵字除了Synchronized,還有另一大分支Atomic。如果大家沒聽過沒用過先看基礎篇,如果聽過用過,請滑至底部看進階篇,深入原始碼分析。

提出問題:int執行緒安全嗎?

看過Synchronized相關文章的小夥伴應該知道其是不安全的,再次用程式碼應驗下其不安全性:

public class testInt {
    static int number = 0;

    public static void main(String[] args) throws Exception {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100000; i++) {
                    number = number+1;
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();
        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();
        System.out.println("number:" + number);
    }
}

 

 

執行結果:

在上面的例子中,我們定義一個初始值為0的靜態變數number,再新建並執行兩個執行緒讓其各執行10萬次的自增操作,如果他是執行緒安全的,應該兩個執行緒執行後結果為20萬,但是我們發現最終的結果是小於20萬的,即說明他是不安全的。

在之前Synchronized那篇文章中說過,可以在number=number+1這句程式碼上下加Synchronized關鍵字實現執行緒安全。但是其對資源的開銷較大,所以我們今天再看下另外一種實現執行緒安全的方法Atomic。

Atomic基礎篇分界線

原子整數(基礎型別)

整體介紹

Atomic是jdk提供的一系列包的總稱,這個大家族包括原子整數(AtomicInteger,AtomicLong,AtomicBoolean),原子引用(AtomicReference,AtomicStampedReference,AtomicMarkableReference),原子陣列(AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray),更新器(AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater)。

AtomicInteger

AtomicInteger,AtomicBoolean,AtomicLong三者功能類似,咱就以AtomicInteger為主分析原子類。

先看下有哪些API,及其他們具體啥功能:

public class testInt {

    public static void main(String[] args) {
        //定義AtomicInteger型別的變數,值為1
        AtomicInteger i = new AtomicInteger(1);
        //incrementAndGet方法先新增1再返回,所以列印2,此時i為2
        System.out.println(i.incrementAndGet());
        //getAndIncrement方法先返回值再新增1,所以列印2,此時i為3
        System.out.println(i.getAndIncrement());
        //get方法返回當前i值,所以列印3,此時i為3
        System.out.println(i.get());
        //引數為正數即新增,getAndAdd方法先返回值再新增666,所以列印3,此時i為669
        System.out.println(i.getAndAdd(666));
        //引數為負數即減去,getAndAdd方法先返回值再減去1,所以列印669,此時i為668
        System.out.println(i.getAndAdd(-1));
        //引數為正數即新增,addAndGet方法先新增666再返回值,所以列印1334,此時i為1334
        System.out.println(i.addAndGet(666));
        //引數為負數即減去,addAndGet方法先減去-1再返回值,所以列印1333,此時i為1333
        System.out.println(i.addAndGet(-1));
        //getAndUpdate方法IntUnaryOperator引數是一個箭頭函式,後面可以寫任何操作,所以列印1333,此時i為13331
        System.out.println(i.getAndUpdate(x -> (x * 10 + 1)));
        //最終列印i為13331
        System.out.println(i.get());
    }
} 

 

 

執行結果:

對上述int型別的例子改進

public class testInt {
    //1.定義初始值為0的AtomicInteger型別變數number
    static AtomicInteger number = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100000; i++) {
                    //2.呼叫incrementAndGet方法,實現加1操作
                    number.incrementAndGet();
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();
        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();
        System.out.println("number:" + number.get());
    }
}

 

 

我們可以看到執行結果是正確的20萬,說明AtomicInteger的確保證了執行緒安全性,即在多執行緒的過程中,執行結果還是正確的。但是這存在一個ABA問題,下面將原子引用的時候再說,先立個flag。

原始碼分析

我們以incrementAndGet方法為例,看下底層是如何實現的,AtomicInteger類中的incrementAndGet方法呼叫了Unsafe類的getAndAddInt方法。

public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

 

我們看下getAndAddInt方法,裡面有個迴圈,直接值為compareAndSwapInt返回值為true,才結束迴圈。這裡就不得不提CAS,這就是多執行緒安全性問題的解決方法。

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
}

 

CAS

執行緒1和執行緒2同事獲取了主記憶體變數值0,執行緒1加1並寫入主記憶體,現在主記憶體變數值1,執行緒2也加2並嘗試寫入主記憶體,這個時候是不能寫入主記憶體的,因為會覆蓋掉執行緒1的操作,具體過程如下圖。

CAS是線上程2嘗試寫入記憶體的時候,通過比較並設定(CompareAndSet)發現現在主記憶體當前值為1,和他剛開始讀取的值0不一樣,所以他會放棄本次修改,重新讀取主記憶體的最新值,然後再重試下執行緒2的具體邏輯操作,再次嘗試寫入主記憶體。如果這時候執行緒1,再次對主記憶體進行了修改,執行緒2發現現在主記憶體的值又和預期不一樣,所以將放棄本次修改,再次讀取主記憶體最新值,再次重試並嘗試寫入主記憶體。我們可以發現這是一個重複比較的過程,即直到和預期初始值一樣,才會寫入主記憶體,否則將一直讀取重試的迴圈。這就是上面for迴圈的意義。

CAS的實現實際上利用了CPU指令來實現的,如果作業系統不支援CAS,還是會加鎖的,如果作業系統支援CAS,則使用原子性的CPU指令。

原子引用

在日常使用中,我們不止對上述基本型別進行原子操作,而是需要對一些複雜型別進行原子操作,所以需要AtomicReference。

不安全實現

先看不安全的BigDecimal型別:

public class testReference {
    static BigDecimal number = BigDecimal.ZERO;

    public static void main(String[] args) throws Exception {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                   number=number.add(BigDecimal.ONE);
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();

        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();

        System.out.println(number);
    }
} 

 

 

執行結果如下圖,我們可以看到兩個執行緒,自迴圈1000次加1操作,最終結果應該是2000,可是結果小於2000。

安全實現-使用CAS

public class testReference {
    //定義AtomicReference型別BigDecimal變數
    static AtomicReference<BigDecimal> number = new AtomicReference<BigDecimal>(BigDecimal.ZERO);

    public static void main(String[] args) throws Exception {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    //手動寫迴圈+CAS判斷
                    while(true){
                        BigDecimal pre=number.get();
                        BigDecimal next=number.get().add(BigDecimal.ONE);
                      if(number.compareAndSet(pre,next))  {
                          break;
                      }
                    }
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();

        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();

        System.out.println(number.get());

    }
}

 

 

執行結果如下:

ABA問題及解決

在上面CAS過程中,是通過值比較來知曉是不是能夠更新成功,那如果執行緒1先加1再減1,這樣主記憶體還是原來的值,即執行緒2還是可以更新成功的。但是這樣邏輯錯了,執行緒1已經發生了修改,執行緒2不能直接更新成功。

程式碼:

public class testInt {

    static AtomicInteger number = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = number.get();
                System.out.println("開始number:" + a);
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(number.compareAndSet(a, a++));


            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("開始增加操作");
                int a = number.incrementAndGet();
                System.out.println("當前number:" + a);
                int b = number.decrementAndGet();
                System.out.println("當前number:" + b);
            }
        });
        t2.start();

        t1.join();
        t2.join();
    }
} 

 

 

我們看執行緒2對其進行了一系列操作,但是最後列印了還是true,表示可以更新成功的。這顯然不對。

那我們可以使用AtomicStampedReference,為其新增一個版本號。執行緒1在剛開始讀取主記憶體的時候,獲取到值為0,版本為1,執行緒2也獲取到這兩個值,執行緒1進行加1,減1的操作的時候,版本各加1,現在主記憶體的值為0,版本為2,而執行緒2還拿著預計值為0,版本為1的資料嘗試寫入主記憶體,這個時候因版本不同而更新失敗。具體我們用程式碼試下:

public class testInt {

    static AtomicStampedReference<Integer> number = new AtomicStampedReference<Integer>(0, 0);

    public static void main(String[] args) throws Exception {


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = number.getReference();
                int s = number.getStamp();
                System.out.println("開始number:" + a + ",stamp:" + s);
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(number.compareAndSet(a, a + 1, s, s + 1));


            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("開始增加操作");
                int a = number.getReference();
                int s = number.getStamp();
                number.compareAndSet(a, a + 1, s, s + 1);
                System.out.println("當前number:" + a + ",stamp:" + (s + 1));
                a = number.getReference();
                s = number.getStamp();
                number.compareAndSet(a, a - 1, s, s + 1);
                System.out.println("當前number:" + a + ",stamp:" + (s+1));
            }
        });
        t2.start();

        t1.join();
        t2.join();
    }
} 

 

我們可以看到每次操作都會更新stamp(版本號),在最後對比的時候不僅比較值,還比較版本號,所以是不能更新成功的,false.

原子陣列

AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray三者類似,所以以AtomicIntegerArray為例,我們可以將下面AtomicIntegerArray看做是AtomicInteger型別的陣列,其底層很類似,就不詳細寫了。

AtomicIntegerArray  array = new AtomicIntegerArray(10);
array.getAndIncrement(0);   // 將第0個元素原子地增加1

 

AtomicInteger[]  array = new AtomicInteger[10];
array[0].getAndIncrement();  // 將第0個元素原子地增加1

 

欄位更新器和原子累加器比較簡單,這裡就不說了。 

Atomic進階篇分界線

LongAdder原始碼分析

LongAdder使用

LongAdder是jdk1.8之後新加的,那為什麼要加他?這個問題,下面將回答,我們先看下如何使用。

public class testLongAdder {
    public static void main(String[] args) throws Exception {
        LongAdder number = new LongAdder();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    number.add(1L);
                }
            }
        };
        Thread t1 = new Thread(runnable);
        Thread t2 = new Thread(runnable);
        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("number:" + number);
    }
}

 

我們可以看到LongAdder的使用和AtomicLong大致相同,使用兩個執行緒Thread1,Thread2對number值各進行一萬次的自增操作,最後的number是正確的兩萬。

與Atomic的對比優勢

那問題來了,既然AtomicLong能夠完成對多執行緒下的number進行執行緒安全的操作,那為什麼還要LongAdder?我們先來段程式碼比較下,兩個在結果都是正確的前提下,效能方面的差距。

public class testLongAdder {
    public static void main(String[] args) {
       //1個執行緒,進行100萬次自增操作
        test1(1,1000000);
      //10個執行緒,進行100萬次自增操作
        test1(10,1000000);
     //100個執行緒,進行100萬次自增操作
        test1(100,1000000);
    }

    static void test1(int threadCount,int times){
        long startTime=System.currentTimeMillis();
        AtomicLong number1=new AtomicLong();
        List<Thread> threads1=new ArrayList<>();
        for(int i=0;i<threadCount;i++) {
            threads1.add(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        number1.incrementAndGet();
                    }
                }
            }));
        }
        threads1.forEach(thread -> thread.start());
        threads1.forEach(thread ->{
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } );

        long endTime=System.currentTimeMillis();
        System.out.println("AtomicLong:"+number1+",time:"+(endTime-startTime));

        LongAdder number2=new LongAdder();
        List<Thread> threads2=new ArrayList<>();
        for(int i=0;i<threadCount;i++) {
            threads2.add(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        number2.add(1);
                    }
                }
            }));
        }
        threads2.forEach(thread -> thread.start());
        threads2.forEach(thread ->{
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } );

        System.out.println("LongAdder:"+number2+",time:"+(System.currentTimeMillis()-endTime));

    }
} 

 

上述程式碼對比了1個執行緒,10個執行緒,100個執行緒在進行100百次自增操作後,AtomicLong和LongAdder所花費的時間。通過列印語句,我們發現在最終number1和number2都正確的基礎上,LongAdder花費的時間比AtomicLong少了一個量級。

原始碼分析

那為什麼會導致這種情況,我們就要從原始碼層面分析。AtomicLong為什麼效率低?因為如果執行緒數量一多,尤其在高併發的情況下,比如有100個執行緒同時想要對物件進行操作,肯定只有一個執行緒會獲取到鎖,其他99個執行緒可能空轉,一直迴圈知道執行緒釋放鎖。如果該執行緒操作完畢釋放了鎖,其他99個執行緒再次競爭,也只有一個執行緒獲取鎖,另外98個執行緒還是空轉,直到鎖被釋放。這樣CAS操作會浪費大量資源在空轉上,從而使得AtomicLong線上程數越來越多的情況下越來越慢。

AtomicLong是多個執行緒對同一個value值進行操作,導致多個執行緒自旋次數太多,效能降低。而LongAdder在無競爭的情況,跟AtomicLong一樣,對同一個base進行操作,當出現競爭關係時則是採用化整為零的做法,從空間換時間,用一個數組cells,將一個value拆分進這個陣列cells。多個執行緒需要同時對value進行操作時候,可以對執行緒id進行hash得到hash值,再根據hash值對映到這個陣列cells的某個下標,再對該下標所對應的值進行自增操作。當所有執行緒操作完畢,將陣列cells的所有值和無競爭值base都加起來作為最終結果。

我們先看下LongAdder裡面的欄位,發現其裡面沒有,主要是在其繼承的Stripped64類中,有下面四個主要變數。

 /** CPU數量,即cells陣列的最大長度*/
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     *cells陣列,為2的冪,2,4,8,16.....,方便以後位運算
     */
    transient volatile Cell[] cells;

    /**
     * 基值,主要用於沒有競爭的情況,通過CAS更新。
     */
    transient volatile long base;

    /**
     * 調整單元格大小(擴容),建立單元格時使用的鎖。
     */
    transient volatile int cellsBusy;

 

 

下面是add方法開始。

 public void add(long x) {
        //as:cells陣列的引用
        //b:base的基礎值
        //v:期望值
        //m:cells陣列大小
        //a:當前陣列命中的單元
        Cell[] as; long b, v; int m; Cell a;
        //as不為空(cells已經初始化過,說明之前有其他執行緒對初始化)或者CAS操作不成功(執行緒間出現競爭)
        if ((as = cells) != null || !casBase(b = base, b + x)) {
        //初始化uncontented,true表示未競爭(因為有兩個情況,這裡先初始化,後面對其修改,就能區分這兩種情況)
        boolean uncontended = true;
        //as等於null(cells未初始化)
        //或者執行緒id雜湊出來的下標所對應的值為空(cell等於空),getProbe() & m功能是獲取下標,底層邏輯是位運算
        //或者更新失敗為false,即發生競爭,取非就為ture
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
        //進入到if裡面,說明更新case失敗,或者更新某個cell也失敗了,或者cell為空,或者cells為空
                longAccumulate(x, null, uncontended);
        }
    }

 

從LongAdder呼叫Stripped64的longAccumulate方法,主要是初始化cellscells的擴容多個執行緒同時命中一個cell的競爭操作。

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //x:add的值,fn:為null,wasUncontended:是否發生競爭,true為發生競爭,false為不發生競爭
        int h;//執行緒的hash值
        //如果該執行緒為0,即第一次進來,所以ThreadLocalRandom強制初始化執行緒id,再對其hash
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); 
            h = getProbe();
            wasUncontended = true;
        }
        //擴容意向,為false肯定不擴容,為true可能擴容
        boolean collide = false; 
        //死迴圈               
        for (;;) {
            //as:cells陣列的引用
            //a:當前執行緒命中的cell
            //n:cells的長度
            //v:當前執行緒命中的cell所擁有的value值
            Cell[] as; Cell a; int n; long v;
            //cells不為空
            if ((as = cells) != null && (n = as.length) > 0) {
                //當前執行緒命中的cell為空,下面邏輯是新增cell
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //發生競爭
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //沒有競爭,嘗試修改當前執行緒對應的cell值,成功跳出迴圈
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //如果n大於CPU最大數量,不可擴容
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                //獲取到了鎖,進行擴容,為2的冪,
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];//左移一位運算子,數量加倍
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //cells等於空,並且獲取到鎖,開始初始化工作,建立結束釋放鎖,繼續迴圈
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

 

 

結語

結束了,撒花。這篇主要說了Atomic的一些使用,包括Atomic原子類(AtomicInteger,AtomicLong,AtomicBoolean),Atomic原子引用(AtomicReference,AtomicStampedReference),以及1.8之後LongAdder的優勢,原始碼分析。過程還穿插了一些CAS,ABA問題引入和解決方式。

 

參考資料

Java多執行緒進階(十七)—— J.U.C之atomic框架:LongAdder

CAS原理

Java 8 Performance Improvements: LongAdder vs AtomicLong

AtomicInteger深入理解

原子操作類AtomicInteger詳解

玩轉Java併發工具,精通JUC,成為併發多