JAVA 併發與高併發知識筆記(二)
一、併發安全、不安全描述
安全:多個執行緒操作同一個資源,最後的執行結果與單執行緒執行結果一致,則說明是執行緒安全的
不安全:多個執行緒操作同一個資源,最後執行結果不確定的,則說明不是執行緒安全的
這裡我覺得還是解釋一下併發與並行的一點區別比好(並非絕對概念),併發通常是多個執行緒去競爭相同資源,而並行通常是多個執行緒之間是協作關係,例如,在秒殺場景下,多個使用者(執行緒)共同爭搶某個資源,這個是併發。例如,多個執行緒統計一個幾千萬資料的檔案,這個時候執行緒之間是協作關係,每個執行緒各自統計分配的一段資料,最後彙總給主執行緒。
二、常見併發模擬工具以及程式碼模擬併發(工具使用之後再單獨學習)
a) Postman :http 請求模擬工具
b) AB (Apache Bench) :Apache 附帶的模擬工具,主要用來測試網站效能
c) JMeter : Apache 組織開發的壓力測試工具
d) 使用 CountDownLatch、Semaphore 進行併發模擬 (在筆記一中已經提到)
三、程式碼模擬併發
a) CountDownLatch 介紹:該類為一個計數器,字面意思就是向下減的一個閉鎖類
上圖解釋:
TA 為主執行緒,主執行緒初始化 CountDownLatch 計數器為3,T1~3 為子執行緒,主執行緒呼叫CountDownLatch的 await 後就開始阻塞,直到T1~3 呼叫 CountDownLatch 的 countDown() 方法將計數器減為0,然後主執行緒繼續執行。
b) Semaphore 介紹:
字面意思是訊號量,它可以控制同一時間內有多少個執行緒可以執行,比如我們常見的馬路,有4車道,8車道,可以把這裡的車道比作執行緒,4車道相當於4個執行緒同時執行,8車道想相當於8個執行緒執行。semaphore 就是好比是這個車道,可以指定有多個車道,從對訊號量的功能描述,可以想到在實際開發中可以用來限制同一時間請求介面的次數,通常semaphore 會與執行緒池配合使用。
c) 併發模擬程式碼(Not thread safe)
d ) 併發模擬daim(Thread safe)/** * 併發模擬 * * @author Aaron * */ @NotThreadSafe @Slf4j public class ConcurrencyTest1 { // 模擬 1000個使用者請求 private final static int TotalClient = 1000; // 限制同一時間只能有10個執行緒執行 private final static int TotalThread = 10; // 計數器 private static int count = 0; public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); // 設定訊號量,允許同時最多執行的執行緒數 final Semaphore sp = new Semaphore(TotalThread); final CountDownLatch cdl = new CountDownLatch(TotalClient); for (int i = 0; i < TotalClient; i++) { es.execute(new Runnable() { @Override public void run() { try { sp.acquire(); add(); sp.release(); } catch (InterruptedException e) { log.error("A", e); } cdl.countDown(); } }); } // 中斷主線層程式碼,直至countdownlatch 的計數器變為0 cdl.await(); es.shutdown(); log.info(String.valueOf(count)); } @NotThreadSafe public static void add() { count++; } }
/**
* 併發模擬
*
* @author Aaron
*
*/
@ThreadSafe
@Recommend
@Slf4j
public class ConcurrencyTest2 {
// 模擬 1000個使用者請求
private final static int TotalClient = 1000;
// 限制同一時間只能有10個執行緒執行
private final static int TotalThread = 10;
// 使用原子類
private final static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
// 設定訊號量,允許同時最多執行的執行緒數
final Semaphore sp = new Semaphore(TotalThread);
final CountDownLatch cdl = new CountDownLatch(TotalClient);
for (int i = 0; i < TotalClient; i++) {
es.execute(new Runnable() {
@Override
public void run() {
try {
sp.acquire();
add();
sp.release();
} catch (InterruptedException e) {
log.error("A", e);
}
cdl.countDown();
}
});
}
// 中斷主線層程式碼,直至countdownlatch 的計數器變為0
cdl.await();
es.shutdown();
log.info(String.valueOf(count.get()));
}
@ThreadSafe
public static void add() {
count.incrementAndGet();
}
}
四、類的執行緒安全性定義
當多個執行緒同時訪問一個類時,不管執行時環境採用何種方式呼叫或者這些執行緒如何交替執行, 並且在主調程式碼中不需要做額外的同步或協同操作,這個類始終表現出正確的行為,那麼這個類就是執行緒安全的。
五、執行緒安全的主要體現點
a) 原子性:
原子性可以解釋為互斥性訪問,既同一時刻,只能有一個執行緒進行操作
b) 可見性:
某個執行緒對主記憶體的修改,其它執行緒必須能及時觀察到
c) 有序性:
某個執行緒觀察其它執行緒中的指令執行順序,由於指令重排序的存在,通常觀察到的是雜亂無序的
六、CAS 原理 (以 AtomicInteger 為參考)
由於學習發現我的 eclipse 看不到 unsafe 原始碼,其它原始碼可以看到,所以特意安裝了反編譯外掛(Decompiler)
地址:https://www.cnblogs.com/godtrue/p/5499785.html
a ) CAS 是 unsafe 中的 compareAndSwapInt 方法的縮寫
b) 原理,在 AtomicInteger 的 incrementAndGet 方法裡呼叫了 unsafe 中的 getAndAddInt 方法,在該方法中,核心的方法是 compareAdnSwapInt 方法,核心原理是通過物件以及值的記憶體地址取出當前值,然後再進行比較,如果比較是值發生了更改則重新取出最新的值再繼續比較,直到比較成功,然後更新值。
// 標記為 native 的方法,說明不是用java實現的,通常是由C、C++ 等等實現的
// 第一個引數是當前物件,第二個引數是值所對應的記憶體地址
public native int getIntVolatile(Object arg0, long arg1);
// 也是標記為 native 的方法,也是核心方法
// 第一個引數是當前物件,第二個引數是值所對應的記憶體地址,第三個引數為記憶體值,第四個引數為準備更新的值
public final native boolean compareAndSwapInt(Object arg0, long arg1, int arg3, int arg4);
// AtomicInteger 中呼叫的是該方法
// 第一個引數是當前物件,第二個引數是值的記憶體地址,第三個引數為增加量
public final int getAndAddInt(Object arg0, long arg1, int arg3) {
int arg4;
do {
// 取出當前記憶體值(預期值)
arg4 = this.getIntVolatile(arg0, arg1);
// 比較當前記憶體值是否與預期值相等,如果不相等則繼續比較,如果相等則返回當前的記憶體值
// 如果預期值 與 arg1 指向的值一樣,則更新為 arg4 + arg3
// 如果不一樣則繼續迴圈,直到完成更新
} while (!this.compareAndSwapInt(arg0, arg1, arg4, arg4 + arg3));
return arg4;
}
c) CAS 缺點
分析CAS原始碼後可以發現,如果大量執行緒進行CAS操作,那麼競爭就會很激烈,導致一部分執行緒由於總是比較失敗而長時間停留在迴圈體中,可能會有瞬間或一段時間的CPU過載,影響系統性能。
d) (JDK1.8 新增 ) LongAdder 與 DoubleAdder 處理思想
對於普通型別的 long 或 double 型別的變數,JVM 允許將64位的讀操作或寫操作拆分成兩個32位的讀寫操作,該處理方式的主要思想是將熱點資料分離,將內部 Value 分離成一個Cell 陣列,當多個執行緒訪問時通過HASH等演算法將執行緒對映到其中一個Cell 上進行操作,最終的計算結果則是Cell 陣列的求和值,當低併發的時候,演算法會直接更新變數的值,在高併發的時候通過分散操作Cell 提高效能,當然缺點也是有的,當併發更改以及呼叫sum操作時,sum統計的值可能不準確,以下是原話。
/**
* Returns the current sum. The returned value is <em>NOT</em> an
* atomic snapshot; invocation in the absence of concurrent (意思是在非併發的情況下使用)
* updates returns an accurate result, but concurrent updates that
* occur while the sum is being calculated might not be
* incorporated.
*
* @return the sum
*/
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
七、java.util.concurrent.atomic 包
八、AtomicReference 與 AtomicIntegerFieldUpdater
a) AtomicReference 基本使用
該類提供一個泛型引數,用於對多種物件的原子操作(注意是物件的操作,如果傳入原子類,則是對這個原子類本身的原子操作,並非是原子類中資料的原子操作),以下為簡單的示例
@ThreadSafe
@Slf4j
public class AtomicReferenceTest {
private static AtomicReference<Integer> ar = new AtomicReference<Integer>(0);
public static void main(String[] args) {
// 比較更新方法,如果是值是0,則更新為1
log.info("{} -> {} - {}", 0, 1, ar.compareAndSet(0, 1));
// 獲取原先的值,並設定為指定的新值
log.info("{} -> {}", ar.getAndSet(3), ar.get());
// 以下是原始碼實現,核心還是使用的Unsafe類的方法
// /**
// * Atomically sets the value to the given updated value
// * if the current value {@code ==} the expected value.
// * @param expect the expected value
// * @param update the new value
// * @return {@code true} if successful. False return indicates that
// * the actual value was not equal to the expected value.
// */
// public final boolean compareAndSet(V expect, V update) {
// native 原子方法
// return unsafe.compareAndSwapObject(this, valueOffset, expect,
// update);
// }
}
}
b) AtomicIntegerFieldUpdater
基本使用
該類提供一個泛型引數,用於對物件內部的成員變數進行原子操作,以下為簡單的示例
@Slf4j
public class AtomicIntegerFieldUpdaterTest {
private static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterTest> a = AtomicIntegerFieldUpdater
.newUpdater(AtomicIntegerFieldUpdaterTest.class, "value");
// 變數必須是 int 基本型別,不能是物件型別
// 變數必須有 volatile 關鍵字修飾
// 以下是原始碼中的判斷
// if (field.getType() != int.class)
// throw new IllegalArgumentException("Must be integer type");
//
// if (!Modifier.isVolatile(modifiers))
// throw new IllegalArgumentException("Must be volatile type");
@Getter
// 未初始化預設是0
private volatile int value;
public static void main(String[] args) {
AtomicIntegerFieldUpdaterTest aifu = new AtomicIntegerFieldUpdaterTest();
// 比較&設定
a.compareAndSet(aifu, 0, 2);
log.info("{}", aifu.getValue());
}
}九、解決 CAS 的ABA問題a) ABA問題解釋:
當多個執行緒操作一個資源時,T1取出值A,T2執行緒也取出值A,這個時候T2執行過程中將A變為B又變回A,然後T1繼續執行發現與自己的值相同,然後進行了更新操作,操作雖然成功,但是這個過程卻是有隱患的,比如對一個單項鍊表操作,T1 取出棧頂A與下一個棧B,想用CAS替換棧頂A為B,在T1執行CAS操作之前,這時候T2取出A和B,然後push了A、C、D,這個時候B屬於獨立的連結串列,處於遊離狀態(當前有兩個連結串列 A->C->D->NULL ,還有一個遊離的 B->NULL),然後T1開始執行CAS操作,發現ACD棧頂還是A,然後開始處理,由於之前已經取出B,當時的B->NULL 這樣的,最後結果是把T2的 CD 給丟了。
b) 為了解決ABA問題,有了 AtomicStampedReference 這個類
該類的核心思想是,每次操作都有個一 version 來記錄,例如 T2 取出A時版本是 1,更新為B後版本號變為2,再更新為A時版本號變為3,此時由於有版本號控制,T1 再來更新A時發現自己的版本號1 與 3 不一致,最後CAS操作失敗。
示例:
@Slf4j
public class ABATest {
// 普通原子類
private static AtomicInteger atomicInt = new AtomicInteger(100);
// 有版本號的實現(引數是初始值與初始版本號)
private static AtomicStampedReference<Integer> atomicStampedRef = new AtomicStampedReference<Integer>(100, 0);
public static void main(String[] args) throws InterruptedException {
// 模擬 B->A
Thread intT1 = new Thread(new Runnable() {
@Override
public void run() {
// A->B
atomicInt.compareAndSet(100, 101);
// B->A
atomicInt.compareAndSet(101, 100);
}
});
// 模擬 A->B
Thread intT2 = new Thread(new Runnable() {
@Override
public void run() {
try {
// 執行緒休眠,給 T1 執行
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// A->B
boolean c3 = atomicInt.compareAndSet(100, 101);
log.info("一般 CAS={}", c3);// 操作成功
}
});
intT1.start();
intT2.start();
intT1.join();
intT2.join();
Thread refT1 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// A-B 版本號 1
atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
// B-A 版本號 2
atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
}
});
Thread refT2 = new Thread(new Runnable() {
@Override
public void run() {
// T2取出版本號
int stamp = atomicStampedRef.getStamp();
log.info("有版本號,執行緒休眠之前:stamp={}", stamp);
try {
// 休眠2秒
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// T1 的版本號
log.info("有版本號,執行緒休眠之後:stamp={}", atomicStampedRef.getStamp());
// A->B ,T1 將版本號增加到了2,然後執行時由於T2
// 持有的版本號還是之前的0,與當前的版本號2不一致,最中CAS操作失敗
boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
log.info("有版本號 CAS={}", c3);
}
});
refT1.start();
refT2.start();
}
十、 AtomicBoolean 示例
可以使用該類來保證某項操作只執行一次
@Slf4j
public class AtomicBooleanTest {
private static AtomicBoolean ab = new AtomicBoolean(true);
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
int c = 1000;
int s = 100;
final Semaphore sh = new Semaphore(s);
final CountDownLatch cdl = new CountDownLatch(c);
for (int i = 0; i < c; i++) {
es.execute(new Runnable() {
@Override
public void run() {
try {
sh.acquire();
init();
sh.release();
} catch (InterruptedException e) {
log.error("Error", e);
}
cdl.countDown();
}
});
}
cdl.await();
es.shutdown();
}
private static void init() {
// 個人理解這麼寫會有效率問題,因為每次都要進行CAS比較,應該加一層 if 判斷,如 init2
if (ab.compareAndSet(true, false)) {
log.info(".......init.....OK");
}
}
private static void init2() {
if (ab.get()) {
if (ab.compareAndSet(true, false)) {
log.info(".......init.....OK");
}
}
}
}