執行緒安全性-原子性之Atomic包
先了解什麼是執行緒安全性:當多個執行緒訪問某個類時,不管執行時環境採用何種排程方式或者這些程序將如何交替執行,並且在主調程式碼中不需要任何額外的同步或協同,這個類都能表現出正確的行為,那麼就稱為這個類是執行緒安全的。
執行緒安全性主要體現在三個方面:
1.原子性:提供了互斥訪問,同一時刻只能有一個執行緒來對它進行操作
2.可見性:一個執行緒對主記憶體的修改可以及時的被其他執行緒觀察到
3.有序性:一個執行緒觀察其他執行緒中的指令執行順序,由於指令 重排序的存在,該觀察結果一般雜亂無序
原子性-Atomic包:他們都是通過CAS來完成原子性的;
先看一段程式碼:
package com.example.concurrency; import com.example.concurrency.annotation.NotThreadSafr; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author xiaozhuang * @date 2022年04月07日 11:55 */ @Slf4j @NotThreadSafr // 執行緒不安全例子 public class ConcurrencyTest { // 請求訪問總數 public static int clientTotal = 5000; // 同時併發執行的執行緒數 public static int threadTotal = 200; // 計數的值 public static int count = 0; public static void main(String[] args) throws InterruptedException { // 執行緒池 ExecutorService executorService = Executors.newCachedThreadPool(); // 訊號量 引數為 執行併發的數目 final Semaphore semaphore = new Semaphore(threadTotal); // 遞減計數器 引數為 請求數量 沒執行成功一次會 減1 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { // 講請求放入執行緒池中 executorService.execute(() -> { try { //判斷訊號量 判斷當前執行緒是否允許被執行 semaphore.acquire(); add(); // 釋放程序 semaphore.release(); } catch (Exception e) { log.error("exception", e); } // 沒執行完一次 計算器-1 countDownLatch.countDown(); }); } countDownLatch.await(); // 關閉執行緒池 executorService.shutdown(); log.info("count:{}", count); } private static void add() { count++; } }
執行時得到的值不準確,達不到預期的5000;
但是我們吧int型別改成Atomic類時,就達到預期的值了,也就是執行緒安全;
package com.example.concurrency.example.count; import com.example.concurrency.annotation.NotThreadSafr; import com.example.concurrency.annotation.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; /** * @author xiaozhuang * @date 2022年04月07日 11:55 */ @Slf4j @ThreadSafe // 執行緒安全例子 public class CountEample2 { // 請求訪問總數 public static int clientTotal = 5000; // 同時併發執行的執行緒數 public static int threadTotal = 200; // 計數的值 public static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { // 執行緒池 ExecutorService executorService = Executors.newCachedThreadPool(); // 訊號量 引數為 執行併發的數目 final Semaphore semaphore = new Semaphore(threadTotal); // 遞減計數器 引數為 請求數量 沒執行成功一次會 減1 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { // 講請求放入執行緒池中 executorService.execute(() -> { try { //判斷訊號量 判斷當前執行緒是否允許被執行 semaphore.acquire(); add(); // 釋放程序 semaphore.release(); } catch (Exception e) { log.error("exception", e); } // 沒執行完一次 計算器-1 countDownLatch.countDown(); }); } countDownLatch.await(); // 關閉執行緒池 executorService.shutdown(); // count.get() 獲取當前的值 log.info("count:{}", count.get()); } private static void add() { // 先新增操作再獲取值 count.incrementAndGet(); // 先獲取值再新增操作 //count.getAndIncrement(); } }
原因是Atomic的incrementAndGet()方法;點進去看到
this為當前物件,valueOffset值偏移
以原子方式將當前值加一;Unsafe.compareAndSwapInt方法處理此問題
再點進去getAndAddInt方法,是下面這段程式碼:
而CAS就是compareAndSwapInt的縮寫;
被native標識的方法,也就是java底層的方法;
現在對這段程式碼解析一波:首先傳過來的 ①:var1 就是我們之前傳過來的count物件 ②:var2 是當前的值 ③:var4 的值就是1 ④:var5 是通過底層方法獲取底層當前的值;
假設現在要做的操作是2+1:var1 就是我們之前傳過來的count物件; 那麼var2就是2, var4是1;如果沒有別的執行緒來處理var1 物件的時候,這時候 var5返回就是2,也就是this.getIntVolatie(var1,var2)得到的值是2;
然後是compareAndSwapInt這個方法的呼叫:var1-當前count物件,var2-值為2,var5-值為底層傳過來的2,var4-值為1;後面的var5+var4 (var5為底層取出來的值,var4要增加的值1),等於3;
當前方法希望達成的目標是,對這個count物件,當前的var2的值與底層的var5的值相同的情況下,把它更新成後面的值(也就是var5+var4);
當我們剛進來這個方法的時候,var2的值為2,再進行更新變成3的時候,可能會被別的執行緒更改,所有他會判斷期望的值,也就是var2的值等於var5的時候,才會更新成後面的值,如果不相等,則會重新再var1這個物件中取出var2這個值,再去跟var5對比,相等就做後面的操作;
再舉個例子:在進行2+1操作的時候,剛進來var2的值為2,在進行更新變成3的時候,被其他執行緒所更改了,此時底層的var5的值就是3了,當前執行緒拿var2來跟var5對比(也就是2與3對比),是不相等的;那麼它就是再去var1這個物件中把var2值取出來,此時取出來的就是3了(假設期間沒被其他執行緒更改),var2與var5都是3,比對相等,然後進行後面的更新,var5+var4--> 3+1=4;
下面介紹一下Atomic包中 AtomicRerence類的使用:
package com.example.concurrency.example.atomic;
import com.example.concurrency.annotation.NotThreadSafr;
import com.example.concurrency.annotation.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
@ThreadSafe
public class AtomicEample1 {
// AtomicReference類
private static AtomicReference<Integer>count=new AtomicReference<>(0);
public static void main(String[] args) {
// 值為0時,賦值為2 下面的以此類推
count.compareAndSet(0,2);
count.compareAndSet(0,1);
count.compareAndSet(1,3);
count.compareAndSet(2,4);
count.compareAndSet(3,5);
log.info("count:{}",count.get());
}
}
輸出為:4--->首先是0 更新為2, 當其為2時,更新為4;其他的因為第一個引數對不上,所有不會執行。
Atomic包中 AtomicIntegerFieldUpdater類的使用:
package com.example.concurrency.example.atomic;
import com.example.concurrency.annotation.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@Slf4j
@ThreadSafe
public class AtomicEample2 {
// 針對integer的操作 泛型為需要更新的物件
// 第一個引數為類的class,第二引數必須由 volatile修飾的名字 就是下面的count屬性
private static AtomicIntegerFieldUpdater<AtomicEample2> updater =
AtomicIntegerFieldUpdater.newUpdater(AtomicEample2.class, "count");
@Getter
public volatile int count = 100;
private static AtomicEample2 eample2 = new AtomicEample2();
public static void main(String[] args) {
// 如果當前物件的屬性值為100 就修改為150
if (updater.compareAndSet(eample2, 100, 150)) {
log.info("updater success one:{}", eample2.getCount());
}
// 如果當前物件的屬性值為200 就修改為150
if (updater.compareAndSet(eample2, 200, 150)) {
log.info("updater success two:{}", eample2.getCount());
} else {
log.info("updater failed:{}", eample2.getCount());
}
}
}
它是以原子性去更新某個類的例項指定的某個欄位,這裡也就是count欄位,並且這個欄位必須被volatile修飾且不能是static的;
輸出為:updater success one:150與 updater failed:150--->值為100更新為150; 值已經是150,固if不成立走else輸出150;
再介紹一下AtomicStampReference類關於CAS的ABA問題:
首先什麼是CAS的ABA:就是本執行緒在進行CAS操作的時候,其他執行緒將變數的值從A改成了B,然後又改回了A;本執行緒在使用期望值A去與當前變數比較時,發現A變成沒有變,於是CAS就將A值進行了交換操作;此時,實際該值已經被其他執行緒改變過了。
CAS的解決思路:每次變數更新的時候將變數的版本號進行+1操作,那麼之前的就是A是1版本-->改成 B是2版本-->再改回 A是3版本;只有變數被某個執行緒修改過,版本號就會發生遞增變化,從而解決了ABA問題。
package com.example.concurrency.example.atomic;
import com.example.concurrency.annotation.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* @author xiaozhuang
* @date 2022年04月07日 22:31
*/
@Slf4j
@ThreadSafe
public class AtomicEample3 {
// initalRef為初始值 initialStamp為初始版本號
private static AtomicStampedReference<Integer> stampedReference =
new AtomicStampedReference<>(100, 1);
public static void main(String[] args) {
new Thread(() -> {
log.info("張三查-初始值為:{},版本號為:{}", stampedReference.getReference(), stampedReference.getStamp());
stampedReference.compareAndSet(100, 127, stampedReference.getStamp(),
stampedReference.getStamp() + 1);
log.info("當前值為:{},版本號為:{}", stampedReference.getReference(), stampedReference.getStamp());
stampedReference.compareAndSet(127, 100, stampedReference.getStamp(),
stampedReference.getStamp() + 1);
log.info("當前值為:{},版本號為:{}", stampedReference.getReference(), stampedReference.getStamp());
}, "小明").start();
new Thread(() -> {
try {
Thread.sleep(200);
log.info("李四查-初始值為:{},版本號為:{}", stampedReference.getReference(), stampedReference.getStamp());
boolean isSuccess = stampedReference.compareAndSet(100, 126, stampedReference.getStamp(),
stampedReference.getStamp() + 1);
log.info("是否更新成功:{},當前值為:{},版本號為:{}", isSuccess ? "success" : "failed", stampedReference.getReference(), stampedReference.getStamp());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"李四").start();
}
}
點進去compareAndSet方法中
四個引數:① 預期值 ②更新的值 ③預期版本號 ④更新的版本號
對於這份Pair,點進去發現它是AtomicStampedReference的內部類,發現它裡面含有物件的值和版本號
可以看到compareAndSet返回的是更新是否成功的布林值,那麼它是在哪裡進行更新操作的呢?可以看到這個方法裡最底下的 casPair方法,點進去可以看到它是用底層的CAS將值和版本號進行更新的,而CAS怎麼做的,在上面已經分析過了。
再介紹一下AtomicLongArrayBoolean類:
package com.example.concurrency.example.atomic;
import com.example.concurrency.annotation.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
@ThreadSafe
public class AtomicEample4 {
// AtomicBoolean 引數為 預設值
private static AtomicBoolean isHappend = new AtomicBoolean(false);
// 請求訪問總數
public static int clientTotal = 5000;
// 同時併發執行的執行緒數
public static int threadTotal = 200;
public static void main(String[] args) throws InterruptedException {
// 執行緒池
ExecutorService executorService = Executors.newCachedThreadPool();
// 訊號量 引數為 執行併發的數目
final Semaphore semaphore = new Semaphore(threadTotal);
// 遞減計數器 引數為 請求數量 沒執行成功一次會 減1
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
// 講請求放入執行緒池中
executorService.execute(() -> {
try {
//判斷訊號量 判斷當前執行緒是否允許被執行
semaphore.acquire();
test();
// 釋放程序
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
// 沒執行完一次 計算器-1
countDownLatch.countDown();
});
}
countDownLatch.await();
// 關閉執行緒池
executorService.shutdown();
log.info("isHappend:{}", isHappend.get());
}
private static void test() {
// 預期值是false 更新為true
if (isHappend.compareAndSet(false, true)) {
log.info("update sueccess");
}
}
}
可以看到我們請求總數為5000,併發數200,但是這段程式碼只輸出了一次,原因是 第一次是false,與預期值一致,改為true了;那麼接下來不管判斷再多次,它都是true,永遠跟期望值為false不同,所以永遠也修改不了了;在實際應用中,讓一段程式碼只執行一次,不會重複,可以使用它來做。
順帶說一下AtomicLong 與LongAddr的區別以及AtomicLongArray的使用:
LongAddr的優點:LongAdder在AtomicLong的基礎上將單點的更新壓力分散到各個節點,在併發量低的時候通過對一個base的值的直接更新可以很好的保 障和AtomicLong的效能基本保持一致,而在併發量高的時候則是通過分散提高了效能。
缺點:LongAdder在統計的時候如果有併發更新,可能導致統計的資料有誤差。
AtomicLongArray:與其他Atomic類基本差不多,不同的是多了個Array,在進行取值,更新操作時,都要根據索引來維護。