1. 程式人生 > >線程安全性詳解

線程安全性詳解

死循環 base 不支持 syn 原子 roc slf4 hang 順序

線程安全性定義:

當多個線程訪問某個類時,不管運行時環境采用何種調度方式或者這些進程將如何交替執行,並且在主調代碼中不需要任何額外的同步或協同,這個類都能表現出正確的行為,那麽就稱這個類是線程安全的

線程安全性主要體現在三個方面:

  • 原子性:提供了互斥訪問,同一時刻只能有一個線程來對它進行操作
  • 可見性:一個線程對主內存的修改可以及時的被其他線程觀察到
  • 有序性:一個線程觀察其他線程中的指令執行順序,由於指令重排的存在,該觀察結果一般雜亂無序

線程安全性-原子性-atomic包

說到原子性,就不得不提及JDK裏的atomic包,該包中提供了很多Atomic的類,本小節主要介紹該包中常用的幾個類。這些類都是通過CAS來實現原子性的,atomic包提供了如下具有原子性的類:

技術分享圖片

什麽是CAS:

CAS (compareAndSwap),中文叫比較交換,一種無鎖原子算法。過程是這樣:它包含 3 個參數 CAS(V,E,N),V表示要更新變量的值,E表示預期值,N表示新值。僅當 V值等於E值時,才會將V的值設為N,如果V值和E值不同,則說明已經有其他線程做兩個更新,則當前線程則什麽都不做。最後,CAS 返回當前V的真實值。CAS 操作時抱著樂觀的態度進行的,它總是認為自己可以成功完成操作。

當多個線程同時使用CAS 操作一個變量時,只有一個會勝出,並成功更新,其余均會失敗。失敗的線程不會掛起,僅是被告知失敗,並且允許再次嘗試,當然也允許實現的線程放棄操作。基於這樣的原理,CAS 操作即使沒有鎖,也可以發現其他線程對當前線程的幹擾。

與鎖相比,使用CAS會使程序看起來更加復雜一些,但由於其非阻塞的,它對死鎖問題天生免疫,並且,線程間的相互影響也非常小。更為重要的是,使用無鎖的方式完全沒有鎖競爭帶來的系統開銷,也沒有線程間頻繁調度帶來的開銷,因此,他要比基於鎖的方式擁有更優越的性能。

簡單的說,CAS 需要你額外給出一個期望值,也就是你認為這個變量現在應該是什麽樣子的。如果變量不是你想象的那樣,哪說明它已經被別人修改過了。你就需要重新讀取,再次嘗試修改就好了。

CAS的缺點:

CAS 看起來非常的吊,但是它仍然有缺點,最著名的就是 ABA 問題:在CAS操作的時候,其他線程將變量的值A改成了B,然後又改成了A。本線程使用期望值A與當前變量進行比較的時候,發現A變量沒有變,於是CAS就將A值進行了交換操作。這個時候實際上A值已經被其他線程改變過,這與設計思想是不符合的。

如果只是在基本類型上是沒有問題的,但如果是引用類型呢?這個對象中有多個變量,我怎麽知道有沒有被改過?聰明的你一定想到了,加個版本號啊。每次修改就檢查版本號,如果版本號變了,說明改過。這樣只要變量被某一個線程修改過,該變量版本號就會發生遞增操作,從而解決了ABA問題

CAS 底層原理:

CAS是如何將比較和交換這兩個操作,變成一個原子操作呢?這歸功於硬件指令集的發展,實際上,我們可以使用同步將這兩個操作變成原子的,但是這麽做就沒有意義了。所以我們只能靠硬件來完成,硬件保證一個從語義上看起來需要多次操作的行為只通過一條處理器指令就能完成。這類指令常用的有:

  1. 測試並設置(Tetst-and-Set)
  2. 獲取並增加(Fetch-and-Increment)
  3. 交換(Swap)
  4. 比較並交換(Compare-and-Swap)
  5. 加載鏈接/條件存儲(Load-Linked/Store-Conditional)

其中,前面的3條是20世紀時,大部分處理器已經有了,後面的2條是現代處理器新增的。而且這兩條指令的目的和功能是類似的,在IA64,x86 指令集中有 cmpxchg 指令完成 CAS 功能,在 sparc-TSO 也有 casa 指令實現,而在 ARM 和 PowerPC 架構下,則需要使用一對 ldrex/strex 指令來完成 LL/SC 的功能。

CPU 實現原子指令有2種方式:

  1. 通過總線鎖定來保證原子性:
    • 總線鎖定其實就是處理器使用了總線鎖,所謂總線鎖就是使用處理器提供的一個 LOCK# 信號,當一個處理器咋總線上輸出此信號時,其他處理器的請求將被阻塞住,那麽該處理器可以獨占共享內存。但是該方法成本太大。因此有了下面的方式。
  2. 通過緩存鎖定來保證原子性:
    • 所謂 緩存鎖定 是指內存區域如果被緩存在處理器的緩存行中,並且在Lock 操作期間被鎖定,那麽當他執行鎖操作寫回到內存時,處理器不在總線上聲言 LOCK# 信號,而時修改內部的內存地址,並允許他的緩存一致性機制來保證操作的原子性,因為緩存一致性機制會阻止同時修改兩個以上處理器緩存的內存區域數據(這裏和 volatile 的可見性原理相同),當其他處理器回寫已被鎖定的緩存行的數據時,會使緩存行無效。

註意:有兩種情況下處理器不會使用緩存鎖定:

  1. 當操作的數據不能被緩存在處理器內部,或操作的數據跨多個緩存行時,則處理器會調用總線鎖定。
  2. 有些處理器不支持緩存鎖定,對於 Intel 486 和 Pentium 處理器,就是鎖定的內存區域在處理器的緩存行也會調用總線鎖定。

1.AtomicInteger和Unsafe.weakCompareAndSetInt,我們來看一個多線程並發執行計數的例子,代碼如下:

package org.zero.concurrency.demo.example.count;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @program: concurrency-demo
 * @description: 模擬並發請求
 * @author: 01
 * @create: 2018-10-14 00:01
 **/
@Slf4j
public class CountExample1 {

    /**
     * 請求總數
     */
    public static int clientTotal = 5000;

    /**
     * 同時並發執行的線程數量
     */
    public static int threadTotal = 200;

    /**
     * 計數
     */
    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(threadTotal);
        CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(() -> {
                try {
                    // 從信號量獲取執行許可,若並發達到設定的數量,那麽就不會獲取到許可,將會阻塞當前線程,直到能夠獲取到執行許可為止
                    semaphore.acquire();
                    CountExample1.add();
                    // 釋放當前線程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("", e);
                }
                countDownLatch.countDown();
            });
        }

        countDownLatch.await();
        executorService.shutdown();
        log.info("count: {}", count);
    }

    private static void add() {
        count++;
    }
}

以上這個例子,每次執行輸出的結果是不確定的,這種就是線程不安全的。如果我們將以上例子中的count類型換成 AtomicInteger,讓這個變量具有原子性的話,就能夠保證線程安全了。修改代碼如下(重復代碼忽略):

...
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class CountExample2 {
    ...

    /**
     * 計數
     */
    private static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        ...

        countDownLatch.await();
        executorService.shutdown();
        log.info("count: {}", count.get());
    }

    private static void add() {
        // 先+1,然後獲取當前的值,類似於++count
        count.incrementAndGet();
    }
}

將count變量的類型修改成 AtomicInteger 後,每次執行輸出的結果都會是5000,這樣就具有了線程安全性。那麽這是怎麽做到的呢?我們可以看一下incrementAndGet方法的源碼:

public final int incrementAndGet() {
    return U.getAndAddInt(this, VALUE, 1) + 1;
}

可以看到,在這個方法裏實際是通過U這個對象調用了getAndAddInt方法,往該方法裏傳入了當前對象以及當前對象的值偏移量和增量值1。而這個U是什麽呢?實際上是Unsafe這個類的實例(我這裏使用的是JDK11,其他JDK版本的源碼可能不太一樣,但是都一樣是都是指向Unsafe的實例):

private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();

然後我們來看看Unsafe中getAndAddInt方法的實現代碼:

/**
 * Atomically adds the given value to the current value of a field
 * or array element within the given object {@code o}
 * at the given {@code offset}.
 *
 * @param o object/array to update the field/element in
 * @param offset field/element offset
 * @param delta the value to add
 * @return the previous value
 * @since 1.8
 */
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

這個方法裏主要就是一個do while語句,在while裏面調用了一個weakCompareAndSetInt方法:

@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetInt(Object o, long offset,
                                          int expected,
                                          int x) {
    return compareAndSetInt(o, offset, expected, x);
}

而weakCompareAndSetInt方法裏又調用了一個compareAndSetInt方法,這個方法較為核心,而且是使用 native 標識的,代表這是使用其他語言實現的底層方法,例如C/C++:

/**
 * Atomically updates Java variable to {@code x} if it is currently
 * holding {@code expected}.
 *
 * <p>This operation has memory semantics of a {@code volatile} read
 * and write.  Corresponds to C11 atomic_compare_exchange_strong.
 *
 * @return {@code true} if successful
 */
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt(Object o, long offset,
                                             int expected,
                                             int x);

我們再回過頭來看getAndAddInt方法,簡單解釋一下這個方法做的事情及其參數的含義,然後再看它們的調用關系就能清楚了:

/**
 * o 是需要操作的 AtomicInteger 對象
 * offset 是AtomicInteger裏value的地址偏移量
 * delta 需要增加的值,即增量值
 */
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        // 獲取當前主內存裏最新的期望值
        v = getIntVolatile(o, offset);

        // 如果當前o內的value值和期望值v相等,就證明沒有其他線程改變過這個變量,那麽就更新它為期望值v + 增量值delta,否則失敗返回false。如果這一步CAS沒有成功,那就采用無鎖自旋的方式繼續進行CAS操作。這塊是一條CPU指令完成的,依舊是原子操作
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

2.AtomicLong和LongAdder

在之前的例子中,我們可以看到AtomicInteger在執行CAS操作的時候,是用死循環的方式,如果線程競爭非常激烈,那麽失敗量就會很高,性能也就會受到影響。而AtomicLong也是一樣的,它們調用的都是Unsafe裏面的方法,只不過是方法參數類型不一樣而已,實現思想是一樣的。既然有了AtomicLong為什麽還需要LongAdder呢?自然是因為LongAdder有區別於AtomicLong的優點。

我們都知道,JVM會將long,double這些64位的變量的讀寫操作拆分成兩個32位的操作,而LongAdder的設計思想也類似於此。LongAdder的設計思想:

核心是將熱點數據分離,可以將AtomicLong的內部數據value分成一個數組,每個線程訪問時,通過hash等算法映射到其中一個數字進行計數,而最終計數結果為這個數組的求和累加。其中熱點數據value會被分離成多個單元的cell,每個cell獨自維護內部的值,當前對象內value的實際值由所有的cell累積合成,從而使熱點進行了有效的分離,提高了並行度。這樣一來 LongAdder 相當於是在AtomicLong的基礎上將單點的更新壓力分散到各個節點上,在低並發的時候通過對base的直接更新,可以很好的保證和Atomic的性能基本一致,在高並發的場景,通過將熱點分散來提高並行度

缺點:在統計的時候如果有並發更新,可能會導致統計結果有些誤差。

LongAdder.add方法的源碼:

/**
 * Adds the given value.
 *
 * @param x the value to add
 */
public void add(long x) {
    Cell[] cs; long b, v; int m; Cell c;
    if ((cs = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[getProbe() & m]) == null ||
            !(uncontended = c.cas(v = c.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

在實際使用中,處理高並發的時候,可以優先使用LongAdder,而不是繼續使用AtomicLong。但在需要保證數據無誤差的情況下,則需使用全局唯一的AtomicLong


3.AtomicReference和AtomicReferenceFieldUpdater

AtomicReference和AtomicInteger非常類似,不同之處就在於AtomicInteger是對整數的封裝,而AtomicReference是對對象引用的封裝,AtomicReference用於保證對象引用的原子性。AtomicReference的用法同AtomicInteger一樣,只不過是可以放各種對象。AtomicReference的使用示例如下:

@Slf4j
public class AtomicExample4 {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        // 期望值為0時,更新為2
        count.compareAndSet(0, 2);  // 更新為2
        // 期望值為0時,更新為1
        count.compareAndSet(0, 1);  // 不會執行
        // 期望值為1時,更新為3
        count.compareAndSet(1, 3);  // 不會執行
        // 期望值為2時,更新為4
        count.compareAndSet(2, 4);  // 更新為4
        // 期望值為3時,更新為5
        count.compareAndSet(3, 5);  // 不會執行

        // 輸出結果為4
        log.info("count:{}", count.get());
    }
}

AtomicReferenceFieldUpdater有基本類型的實現,例如AtomicIntegerFieldUpdater ,它們的核心作用是可以原子性的去更新某一個類的實例裏所指定的某一個字段。AtomicReferenceFieldUpdater可以原子性的更新對象類型的字段,而AtomicIntegerFieldUpdater 則只可以更新整型字段。如下示例:

@Slf4j
public class AtomicExample5 {

    /**
     * 所指定的字段必須是使用volatile關鍵字修飾的,並且是非static的
     */
    @Getter
    public volatile int count = 100;

    /**
     * 指定原子性地更新AtomicExample5實例裏的count字段。第一個參數為類的class,第二個參數為指定的字段名稱
     */
    private static final AtomicIntegerFieldUpdater<AtomicExample5> UPDATER = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");

    public static void main(String[] args) {
        AtomicExample5 example5 = new AtomicExample5();

        // 期望值為100時,更新為120,更新成功返回true,否則返回false
        if (UPDATER.compareAndSet(example5, 100, 120)) {
            log.info("update success 1, {}", example5.getCount());
        }

        // 此時count=120,所以更新失敗返回false
        if (UPDATER.compareAndSet(example5, 100, 120)) {
            log.info("update success 2, {}", example5.getCount());
        } else {
            log.info("update failed, {}", example5.getCount());
        }
    }
}

4.AtomicStampReference

在上文中,我們提到了CAS裏關於ABA的問題,AtomicStampReference類的主要作用就是用於解決CAS裏的ABA問題,該類的方法加上了stamp(戳記)進比較,這個stamp是自行定義的,常見的有使用時間戳等。AtomicStampReference裏的核心方法源碼:

/**
 * Atomically sets the value of both the reference and stamp
 * to the given update values if the
 * current reference is {@code ==} to the expected reference
 * and the current stamp is equal to the expected stamp.
 *
 * @param expectedReference the expected value of the reference
 * @param newReference the new value for the reference
 * @param expectedStamp the expected value of the stamp
 * @param newStamp the new value for the stamp
 * @return {@code true} if successful
 */
public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}

5.AtomicLongArray

AtomicLongArray主要作用是可以原子性的更新一個數組裏指定索引位置的值,所以AtomicLongArray裏的方法都會需要傳入一個索引值,例如compareAndSet方法需要傳一個index,源碼如下:

/**
 * Atomically sets the element at index {@code i} to {@code newValue}
 * if the element‘s current value {@code == expectedValue},
 * with memory effects as specified by {@link VarHandle#compareAndSet}.
 *
 * @param i the index
 * @param expectedValue the expected value
 * @param newValue the new value
 * @return {@code true} if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(int i, long expectedValue, long newValue) {
    return AA.compareAndSet(array, i, expectedValue, newValue);
}

6.AtomicBoolean(平時用的比較多)

經過之前的鋪墊,就已經知道AtomicBoolean可以原子性的操作boolean值。舉一個例子,我們可以利用AtomicBoolean.compareAndSet方法來實現控制某一段代碼只會執行一次。示例代碼如下:

@Slf4j
public class AtomicExample6 {
    /**
     * 執行總數
     */
    public static int execTotal = 5000;

    private static AtomicBoolean isHappened = new AtomicBoolean(false);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < execTotal; i++) {
            executorService.execute(AtomicExample6::test);
        }

        executorService.shutdown();
        log.info("isHappened: {}", isHappened.get());
    }

    private static void test() {
        // 只會打印一次日誌
        if (isHappened.compareAndSet(false, true)) {
            log.info("execute");
        }
    }
}

AtomicBoolean.compareAndSet方法源碼如下:

/**
 * Atomically sets the value to {@code newValue}
 * if the current value {@code == expectedValue},
 * with memory effects as specified by {@link VarHandle#compareAndSet}.
 *
 * @param expectedValue the expected value
 * @param newValue the new value
 * @return {@code true} if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
    return VALUE.compareAndSet(this,
                               (expectedValue ? 1 : 0),
                               (newValue ? 1 : 0));
}

線程安全性-原子性-synchronized

我們知道原子性提供了互斥訪問,同一時刻只能有一個線程來對它進行操作。在Java裏能保證同一時刻只有一個線程來對其進行操作的,除了atomic包之外,還有鎖機制。

JDK提供的鎖主要分兩種:

  • synchronized:依賴JVM (主要依賴JVM實現鎖,因此在這個關鍵字作用對象的作用範圍內,都是同一時刻只能有一個線程可以進行操作的)
  • Lock:代碼層面的鎖,依賴特殊的CPU指令,通過代碼實現。Lock是一個接口,常用的實現類是ReentrantLock

synchronized是Java中的一個關鍵字,它是一種同步鎖,其修飾的內容有如下四種:

  • 修飾代碼塊:被修飾的代碼稱之為同步語句塊,作用的範圍是大括號括起來的代碼,作用於調用這個代碼塊的對象
  • 修飾方法:被修飾的方法稱之為同步方法,作用的範圍是整個方法,作用於調用這個方法的對象
  • 修飾靜態方法:作用的範圍是整個靜態方法,作用於這個類的所有對象
  • 修飾類:作用的範圍是大括號括起來的部分,作用於這個類的所有對象

實現原子性方式的對比:

  • synchronized:不可中斷鎖,適合競爭不激烈,可讀性好
  • Lock:可中斷鎖,能多樣化同步,競爭激烈時能維持常態
  • Atomic:競爭激烈時能維持常態,比Lock性能好,但只能同步一個值

接下來我們通過一個例子簡單演示一下使用synchronized修飾代碼塊和修飾方法:

@Slf4j
public class SynchronizedExample1 {

    /**
     * 修飾一個代碼塊
     */
    public void test1() {
        synchronized (this) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 - {}", i);
            }
        }
    }

    /**
     * 修飾一個方法
     * 若子類繼承父類,想調用父類的synchronized方法的話,是帶不上synchronized關鍵字的
     * 原因:synchronized 不屬於方法聲明的一部分
     * 如果子類也想使用同步需要在方法上聲明
     */
    public synchronized void test2() {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}", i);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();

        // 使用線程池模擬多線程同時調用同一段sync代碼
        ExecutorService executorService = Executors.newCachedThreadPool();

        // 由於同步塊的作用,所以線程pool-1-thread-1 先輸出0-9,然後pool-1-thread-2 再輸出0-9
        executorService.execute(example1::test1);
        executorService.execute(example1::test1);

        // 由於同步方法只作用於調用該方法的對象,而這裏分別使用了不同對象進行調用
        // 所以這裏線程 pool-1-thread-1和pool-1-thread-2 是交叉輸出的
        executorService.execute(example1::test2);
        executorService.execute(example2::test2);

        executorService.shutdown();
    }
}

然後我們再來看看使用synchronized修飾靜態方法和修飾類的例子:

@Slf4j
public class SynchronizedExample2 {

    /**
     * 修飾類
     */
    public void test1() {
        synchronized (SynchronizedExample2.class) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 - {}", i);
            }
        }
    }

    /**
     * 修飾一個靜態方法
     */
    public static synchronized void test2() {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}", i);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample2 example1 = new SynchronizedExample2();
        SynchronizedExample2 example2 = new SynchronizedExample2();

        // 使用線程池模擬多線程同時調用同一段sync代碼
        ExecutorService executorService = Executors.newCachedThreadPool();

        // 由於同步塊的作用,所以線程pool-1-thread-1 先輸出0-9,然後pool-1-thread-2 再輸出0-9
        executorService.execute(example1::test1);
        executorService.execute(example1::test1);

        // 由於靜態同步方法作用於該類的所有對象,所以即便這裏分別使用了不同對象進行調用也是同步的
        executorService.execute(() -> example1.test2());
        executorService.execute(() -> example2.test2());

        executorService.shutdown();
    }
}

註:由於Lock涵蓋的東西比較多,到時候會放到另外一篇文章中介紹,這裏就先略過


線程安全性-可見性

本小節我們來簡單介紹一下線程安全性裏的可見性,可見性是讓一個線程對主內存的修改可以及時的被其他線程觀察到。

與可見性相反的就是不可見性,導致共享變量在線程中不可見的一些主要原因:

  • 線程交叉執行
  • 重排序結合線程交叉執行
  • 共享變量更新後的值沒有在工作內存與主內存間及時更新

Java提供了synchronized 和 volatile 兩種方法來確保可見性

JMM(java內存模型)關於synchronized的兩條規定:

  1. 線程解鎖前,必須把共享變量的最新值刷新到主內存
  2. 線程加鎖時,將清空工作內存中共享變量的值,從而使用共享變量時需要從主內存中重新讀取最新的值(註意,加鎖和解鎖是同一把鎖)

可見性-volatile,volatile通過加入內存屏障和禁止重排序優化來實現可見性:

  • 對volatile 變量寫操作時,會在寫操作後加入一條store屏障指令,將本地內存中的共享變量值刷新到主內存
  • 對volatile變量讀操作時,會在讀操作前加入一條load屏障指令,從主內存中讀取共享變量

volatile寫示意圖:
技術分享圖片

volatile讀示意圖:
技術分享圖片

但是volatile關鍵字能保證其所修飾的變量是線程安全的嗎?實際上並不能,volatile能阻止重排序實現可見性,但是並不具有原子性。我們來看以下這個例子:

@Slf4j
public class CountExample4 {
    /**
     * 請求總數
     */
    public static int clientTotal = 5000;

    /**
     * 同時並發執行的線程數量
     */
    public static int threadTotal = 200;

    /**
     * 計數
     */
    private static volatile int count = 0;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(threadTotal);
        CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(() -> {
                try {
                    // 從信號量獲取執行許可,若並發達到設定的數量,那麽就不會獲取到許可,將會阻塞當前線程,直到能夠獲取到執行許可為止
                    semaphore.acquire();
                    CountExample4.add();
                    // 釋放當前線程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("", e);
                }
                countDownLatch.countDown();
            });
        }

        countDownLatch.await();
        executorService.shutdown();
        log.info("count: {}", count);
    }

    /**
     * 這個方法是線程不安全的,會有多個線程同時操作count變量
     * 因為volatile只能保證以下三步執行的順序不會被重排序
     * 但是不保證這三步能夠原子執行,所以volatile是不具備原子性的
     * 也就是說還是有可能會有兩個線程交叉執行這三步,導致執行結果不能確定
     */
    private static void add() {
        // volatile關鍵字修飾的count變量在自增時主要做了以下三步:
        // 1.取當前內存中的count值
        // 2.count值加1
        // 3.重新寫回主內存
        count++;
    }
}

通常來說,使用volatile需要具備兩個條件:

  1. 對變量的寫操作不依賴於當前值
  2. 該變量沒有包含在具有其他變量的不必要的式子中

綜上,volatile特別適合用來做線程標記量,如下示例:

volatile boolean inited = false;

// 線程1
context = loadContext();
inited = true;

// 線程2
while(!inited){
    sleep();
}
doSomethingWithConfig(context);

在這個例子中定義了一個用volatile修飾的共享變量inited,其主要作用是用於線程2判斷線程1是否已完成context的初始化。當線程1初始化context完成時,會修改inited變量的值為true。然後由於volatile的可見性,所以此時線程2馬上就能獲取到inited的值為true,接著就可以使用初始化好的context了。


線程安全性-有序性

本小節我們來介紹一下線程安全性裏的有序性:

  • 在Java內存模型中,允許編譯器和處理器對指令進行重排序,但是重排序過程不會影響到單線程程序的執行,卻會影響到多線程並發執行的正確性

在Java裏,我們可以通過volatile關鍵字保證一定的有序性,另外也可以通過synchronized和Lock來保證有序性。很顯然,synchronized和Lock可以保證在同一時間,只會有一個線程執行同步代碼,相當於是讓線程有序的執行同步代碼,自然就保證了有序性。

另外,Java內存模型具備一些先天的有序性,就是可以不需要通過任何手段就能夠得到保證的有序性,這個通常稱之為Happens-before原則。如果兩個操作的執行次序無法從Happens-before原則中推導出來,那麽這兩個操作就不能保證有序性,虛擬機就可以隨意的對它們進行重排序。

Happens-before原則(先行發生原則),Java內存模型一共列出了八條Happens-before原則,如果兩個操作的次序不能從這八種規則中推倒出來,則不能保證有序性:

  1. 程序次序規則:一個線程內,按照代碼順序,書寫在前面的操作先行發生書寫在後面的操作
  2. 鎖定規則:一個unLock操作先行發生於後面對同一個鎖的lock操作
  3. volatile變量規則:對一個變量的寫操作先行發生於後面對這個變量的讀操作
  4. 傳遞規則:如果操作A先行發生於操作B,而操作B又先行發生於操作C,則可以得出操作A先行發生於操作C
  5. 線程啟動規則:Thread對象的start() 方法先行發生於此線程的每一個動作
  6. 線程中斷規則:對線程interrupt()方法的調用先行發生於被中斷線程的代碼檢測到中斷事件的發生
  7. 線程終結規則:線程中所有操作都先行發生於線程的終止檢測,我們可以通過Thread.join()方法結束,可以通過Thread.isAlive()的返回值檢測到線程已經終止執行
  8. 對象終結規則:一個對象的初始化完成先行發生於他的finalize()方法的開始

第一條規則要註意理解,這裏只是程序的運行結果看起來像是順序執行,雖然結果是一樣的,但JVM會對沒有變量值依賴的操作進行重排序,這個規則只能保證單線程下執行的有序性,不能保證多線程下的有序性

線程安全性詳解