1. 程式人生 > 其它 >執行緒安全性-原子性之Atomic包

執行緒安全性-原子性之Atomic包

先了解什麼是執行緒安全性:當多個執行緒訪問某個類時,不管執行時環境採用何種排程方式或者這些程序將如何交替執行,並且在主調程式碼中不需要任何額外的同步或協同,這個類都能表現出正確的行為,那麼就稱為這個類是執行緒安全的。

執行緒安全性主要體現在三個方面:

1.原子性:提供了互斥訪問,同一時刻只能有一個執行緒來對它進行操作

2.可見性:一個執行緒對主記憶體的修改可以及時的被其他執行緒觀察到

3.有序性:一個執行緒觀察其他執行緒中的指令執行順序,由於指令 重排序的存在,該觀察結果一般雜亂無序

原子性-Atomic包:他們都是通過CAS來完成原子性的;

先看一段程式碼:

package com.example.concurrency;

import com.example.concurrency.annotation.NotThreadSafr;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @author xiaozhuang
 * @date 2022年04月07日 11:55
 */
@Slf4j
@NotThreadSafr  // 執行緒不安全例子
public class ConcurrencyTest {
    // 請求訪問總數
    public static int clientTotal = 5000;
    // 同時併發執行的執行緒數
    public static int threadTotal = 200;
    // 計數的值
    public static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        // 執行緒池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 訊號量  引數為  執行併發的數目
        final Semaphore semaphore = new Semaphore(threadTotal);
        // 遞減計數器  引數為 請求數量  沒執行成功一次會  減1
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            // 講請求放入執行緒池中
            executorService.execute(() -> {
                try {
                    //判斷訊號量 判斷當前執行緒是否允許被執行
                    semaphore.acquire();
                    add();
                    // 釋放程序
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                // 沒執行完一次 計算器-1
                countDownLatch.countDown();
            });

        }
        countDownLatch.await();
        // 關閉執行緒池
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        count++;
    }

}

執行時得到的值不準確,達不到預期的5000;

但是我們吧int型別改成Atomic類時,就達到預期的值了,也就是執行緒安全;

package com.example.concurrency.example.count;

import com.example.concurrency.annotation.NotThreadSafr;
import com.example.concurrency.annotation.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author xiaozhuang
 * @date 2022年04月07日 11:55
 */
@Slf4j
@ThreadSafe  // 執行緒安全例子
public class CountEample2 {
    // 請求訪問總數
    public static int clientTotal = 5000;
    // 同時併發執行的執行緒數
    public static int threadTotal = 200;
    // 計數的值
    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        // 執行緒池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 訊號量  引數為  執行併發的數目
        final Semaphore semaphore = new Semaphore(threadTotal);
        // 遞減計數器  引數為 請求數量  沒執行成功一次會  減1
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            // 講請求放入執行緒池中
            executorService.execute(() -> {
                try {
                    //判斷訊號量 判斷當前執行緒是否允許被執行
                    semaphore.acquire();
                    add();
                    // 釋放程序
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                // 沒執行完一次 計算器-1
                countDownLatch.countDown();
            });

        }
        countDownLatch.await();
        // 關閉執行緒池
        executorService.shutdown();
        // count.get() 獲取當前的值
        log.info("count:{}", count.get());
    }

    private static void add() {
        // 先新增操作再獲取值
        count.incrementAndGet();
        // 先獲取值再新增操作
        //count.getAndIncrement();
    }

}

原因是Atomic的incrementAndGet()方法;點進去看到  

this為當前物件,valueOffset值偏移

以原子方式將當前值加一;Unsafe.compareAndSwapInt方法處理此問題

再點進去getAndAddInt方法,是下面這段程式碼:

而CAS就是compareAndSwapInt的縮寫;

被native標識的方法,也就是java底層的方法;

現在對這段程式碼解析一波:首先傳過來的    ①:var1 就是我們之前傳過來的count物件    ②:var2 是當前的值    ③:var4 的值就是1  ④:var5 是通過底層方法獲取底層當前的值;

假設現在要做的操作是2+1:var1 就是我們之前傳過來的count物件; 那麼var2就是2, var4是1;如果沒有別的執行緒來處理var1 物件的時候,這時候  var5返回就是2,也就是this.getIntVolatie(var1,var2)得到的值是2;

然後是compareAndSwapInt這個方法的呼叫:var1-當前count物件,var2-值為2,var5-值為底層傳過來的2,var4-值為1;後面的var5+var4 (var5為底層取出來的值,var4要增加的值1),等於3;

當前方法希望達成的目標是,對這個count物件,當前的var2的值與底層的var5的值相同的情況下,把它更新成後面的值(也就是var5+var4);

當我們剛進來這個方法的時候,var2的值為2,再進行更新變成3的時候,可能會被別的執行緒更改,所有他會判斷期望的值,也就是var2的值等於var5的時候,才會更新成後面的值,如果不相等,則會重新再var1這個物件中取出var2這個值,再去跟var5對比,相等就做後面的操作;

再舉個例子:在進行2+1操作的時候,剛進來var2的值為2,在進行更新變成3的時候,被其他執行緒所更改了,此時底層的var5的值就是3了,當前執行緒拿var2來跟var5對比(也就是2與3對比),是不相等的;那麼它就是再去var1這個物件中把var2值取出來,此時取出來的就是3了(假設期間沒被其他執行緒更改),var2與var5都是3,比對相等,然後進行後面的更新,var5+var4--> 3+1=4;

下面介紹一下Atomic包中 AtomicRerence類的使用

package com.example.concurrency.example.atomic;

import com.example.concurrency.annotation.NotThreadSafr;
import com.example.concurrency.annotation.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;


@Slf4j
@ThreadSafe
public class AtomicEample1 {
    // AtomicReference類
   private static AtomicReference<Integer>count=new AtomicReference<>(0);

    public static void main(String[] args) {
        // 值為0時,賦值為2   下面的以此類推
        count.compareAndSet(0,2);
        count.compareAndSet(0,1);
        count.compareAndSet(1,3);
        count.compareAndSet(2,4);
        count.compareAndSet(3,5);
        log.info("count:{}",count.get());
    }

}

輸出為:4--->首先是0 更新為2, 當其為2時,更新為4;其他的因為第一個引數對不上,所有不會執行。

Atomic包中 AtomicIntegerFieldUpdater類的使用:

package com.example.concurrency.example.atomic;

import com.example.concurrency.annotation.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

@Slf4j
@ThreadSafe
public class AtomicEample2 {
    // 針對integer的操作  泛型為需要更新的物件
    // 第一個引數為類的class,第二引數必須由 volatile修飾的名字  就是下面的count屬性
    private static AtomicIntegerFieldUpdater<AtomicEample2> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicEample2.class, "count");
    @Getter
    public volatile int count = 100;

    private static AtomicEample2 eample2 = new AtomicEample2();

    public static void main(String[] args) {
        // 如果當前物件的屬性值為100  就修改為150
        if (updater.compareAndSet(eample2, 100, 150)) {
            log.info("updater success one:{}", eample2.getCount());
        }
        // 如果當前物件的屬性值為200  就修改為150
        if (updater.compareAndSet(eample2, 200, 150)) {
            log.info("updater success two:{}", eample2.getCount());
        } else {
            log.info("updater failed:{}", eample2.getCount());
        }
    }
}

它是以原子性去更新某個類的例項指定的某個欄位,這裡也就是count欄位,並且這個欄位必須被volatile修飾且不能是static的;

輸出為:updater success one:150與 updater failed:150--->值為100更新為150;  值已經是150,固if不成立走else輸出150;

再介紹一下AtomicStampReference類關於CAS的ABA問題:

首先什麼是CAS的ABA:就是本執行緒在進行CAS操作的時候,其他執行緒將變數的值從A改成了B,然後又改回了A;本執行緒在使用期望值A去與當前變數比較時,發現A變成沒有變,於是CAS就將A值進行了交換操作;此時,實際該值已經被其他執行緒改變過了。

CAS的解決思路:每次變數更新的時候將變數的版本號進行+1操作,那麼之前的就是A是1版本-->改成 B是2版本-->再改回 A是3版本;只有變數被某個執行緒修改過,版本號就會發生遞增變化,從而解決了ABA問題。

package com.example.concurrency.example.atomic;

import com.example.concurrency.annotation.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * @author xiaozhuang
 * @date 2022年04月07日 22:31
 */
@Slf4j
@ThreadSafe
public class AtomicEample3 {
    // initalRef為初始值  initialStamp為初始版本號
    private static AtomicStampedReference<Integer> stampedReference =
            new AtomicStampedReference<>(100, 1);

    public static void main(String[] args) {
        new Thread(() -> {
            log.info("張三查-初始值為:{},版本號為:{}", stampedReference.getReference(), stampedReference.getStamp());
            stampedReference.compareAndSet(100, 127, stampedReference.getStamp(),
                    stampedReference.getStamp() + 1);
            log.info("當前值為:{},版本號為:{}", stampedReference.getReference(), stampedReference.getStamp());
            stampedReference.compareAndSet(127, 100, stampedReference.getStamp(),
                    stampedReference.getStamp() + 1);
            log.info("當前值為:{},版本號為:{}", stampedReference.getReference(), stampedReference.getStamp());
        }, "小明").start();
        new Thread(() -> {
            try {
                Thread.sleep(200);
                log.info("李四查-初始值為:{},版本號為:{}", stampedReference.getReference(), stampedReference.getStamp());
                boolean isSuccess = stampedReference.compareAndSet(100, 126, stampedReference.getStamp(),
                        stampedReference.getStamp() + 1);
                log.info("是否更新成功:{},當前值為:{},版本號為:{}", isSuccess ? "success" : "failed", stampedReference.getReference(), stampedReference.getStamp());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        },"李四").start();
    }
}

點進去compareAndSet方法中

四個引數:① 預期值  ②更新的值 ③預期版本號 ④更新的版本號

對於這份Pair,點進去發現它是AtomicStampedReference的內部類,發現它裡面含有物件的值和版本號

可以看到compareAndSet返回的是更新是否成功的布林值,那麼它是在哪裡進行更新操作的呢?可以看到這個方法裡最底下的 casPair方法,點進去可以看到它是用底層的CAS將值和版本號進行更新的,而CAS怎麼做的,在上面已經分析過了。

 再介紹一下AtomicLongArrayBoolean類:

package com.example.concurrency.example.atomic;

import com.example.concurrency.annotation.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;


@Slf4j
@ThreadSafe
public class AtomicEample4 {
    // AtomicBoolean   引數為 預設值
    private static AtomicBoolean isHappend = new AtomicBoolean(false);
    // 請求訪問總數
    public static int clientTotal = 5000;
    // 同時併發執行的執行緒數
    public static int threadTotal = 200;

    public static void main(String[] args) throws InterruptedException {
        // 執行緒池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 訊號量  引數為  執行併發的數目
        final Semaphore semaphore = new Semaphore(threadTotal);
        // 遞減計數器  引數為 請求數量  沒執行成功一次會  減1
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            // 講請求放入執行緒池中
            executorService.execute(() -> {
                try {
                    //判斷訊號量 判斷當前執行緒是否允許被執行
                    semaphore.acquire();
                    test();
                    // 釋放程序
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                // 沒執行完一次 計算器-1
                countDownLatch.countDown();
            });

        }
        countDownLatch.await();
        // 關閉執行緒池
        executorService.shutdown();
        log.info("isHappend:{}", isHappend.get());
    }

    private static void test() {
        // 預期值是false   更新為true
        if (isHappend.compareAndSet(false, true)) {
            log.info("update sueccess");
        }
    }
}

可以看到我們請求總數為5000,併發數200,但是這段程式碼只輸出了一次,原因是 第一次是false,與預期值一致,改為true了;那麼接下來不管判斷再多次,它都是true,永遠跟期望值為false不同,所以永遠也修改不了了;在實際應用中,讓一段程式碼只執行一次,不會重複,可以使用它來做。

順帶說一下AtomicLong 與LongAddr的區別以及AtomicLongArray的使用:

LongAddr的優點:LongAdder在AtomicLong的基礎上將單點的更新壓力分散到各個節點,在併發量低的時候通過對一個base的值的直接更新可以很好的保                                障和AtomicLong的效能基本保持一致,而在併發量高的時候則是通過分散提高了效能。
                   缺點:LongAdder在統計的時候如果有併發更新,可能導致統計的資料有誤差。
AtomicLongArray:與其他Atomic類基本差不多,不同的是多了個Array,在進行取值,更新操作時,都要根據索引來維護。