1. 程式人生 > >並發編程實戰-保證線程安全方式

並發編程實戰-保證線程安全方式

compare 順序 不同 兼容 數據結構 align executors war 輸入

線程安全理解

從題目來看,顯然有點大,線程安全問題,從小系統到各分布式系統,其復雜程度可以想象,隨之線程安全的抽象性不言而喻。總之就為了下個定義:一個類在可以被多個線程安全調用時就是線程安全的。

線程安全分類

線程安全不是一個非真即假的命題,可以將共享數據按照安全程度的強弱順序分成以下五類:不可變、絕對線程安全、相對線程安全、線程兼容和線程對立。

1. 什麽是不可變

不可變(Immutable)的對象一定是線程安全的,無論是對象的方法實現還是方法的調用者,都不需要再采取任何的線程安全保障措施,只要一個不可變的對象被正確地構建出來,那其外部的可見狀態永遠也不會改變,永遠也不會看到它在多個線程之中處於不一致的狀態。

不可變的類型:

  • final 關鍵字修飾的基本數據類型;
  • String
  • 枚舉類型
  • Number 部分子類,如 Long 和 Double 等數值包裝類型,BigInteger 和 BigDecimal 等大數據類型。但同為 Number 的子類型的原子類 AtomicInteger 和 AtomicLong 則並非不可變的。

對於集合類型,可以使用 Collections.unmodifiableXXX() 方法來獲取一個不可變的集合。

public class ImmutableExample {
    public static void main(String[] args) {
        Map<String, Integer> map = new HashMap<>();
        Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
        unmodifiableMap.put("a", 1);
    }
}
Exception in thread "main" java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
    at ImmutableExample.main(ImmutableExample.java:9)

Collections.unmodifiableXXX() 先對原始的集合進行拷貝,需要對集合進行修改的方法都直接拋出異常。

public V put(K key, V value) {
    throw new UnsupportedOperationException();
}

多線程環境下,應當盡量使對象成為不可變,來滿足線程安全。

2. 絕對線程安全

不管運行時環境如何,調用者都不需要任何額外的同步措施。

3. 相對線程安全

相對的線程安全需要保證對這個對象單獨的操作是線程安全的,在調用的時候不需要做額外的保障措施,但是對於一些特定順序的連續調用,就可能需要在調用端使用額外的同步手段來保證調用的正確性。

在 Java 語言中,大部分的線程安全類都屬於這種類型,例如 Vector、HashTable、Collections 的 synchronizedCollection() 方法包裝的集合等。

對於下面的代碼,如果刪除元素的線程刪除了一個元素,而獲取元素的線程試圖訪問一個已經被刪除的元素,那麽就會拋出 ArrayIndexOutOfBoundsException。

public class VectorUnsafeExample {
    private static Vector<Integer> vector = new Vector<>();

    public static void main(String[] args) {
        while (true) {
            for (int i = 0; i < 100; i++) {
                vector.add(i);
            }
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(() -> {
                for (int i = 0; i < vector.size(); i++) {
                    vector.remove(i);
                }
            });
            executorService.execute(() -> {
                for (int i = 0; i < vector.size(); i++) {
                    vector.get(i);
                }
            });
            executorService.shutdown();
        }
    }
}
Exception in thread "Thread-159738" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 3
    at java.util.Vector.remove(Vector.java:831)
    at VectorUnsafeExample.lambda$main$0(VectorUnsafeExample.java:14)
    at VectorUnsafeExample$$Lambda$1/713338599.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

如果要保證上面的代碼能正確執行下去,就需要對刪除元素和獲取元素的代碼進行同步。

executorService.execute(() -> {
    synchronized (vector) {
        for (int i = 0; i < vector.size(); i++) {
            vector.remove(i);
        }
    }
});
executorService.execute(() -> {
    synchronized (vector) {
        for (int i = 0; i < vector.size(); i++) {
            vector.get(i);
        }
    }
});

4. 線程兼容

線程兼容是指對象本身並不是線程安全的,但是可以通過在調用端正確地使用同步手段來保證對象在並發環境中可以安全地使用,我們平常說一個類不是線程安全的,絕大多數時候指的是這一種情況。Java API 中大部分的類都是屬於線程兼容的,如與前面的 Vector 和 HashTable 相對應的集合類 ArrayList 和 HashMap 等。

5. 線程對立

線程對立是指無論調用端是否采取了同步措施,都無法在多線程環境中並發使用的代碼。由於 Java 語言天生就具備多線程特性,線程對立這種排斥多線程的代碼是很少出現的,而且通常都是有害的,應當盡量避免。

線程安全的實現方法

1. 互斥同步

synchronized 和 ReentrantLock。這兩種方式在另外一篇文章synchronized 和 ReentrantLock區別裏有,這裏簡單說一下其實就是用不同的方式保證線程安全。

2. 非阻塞同步

互斥同步最主要的問題就是進行線程阻塞和喚醒所帶來的性能問題,因此這種同步也稱為阻塞同步(Blocking Synchronization)。

從處理問題的方式上說,互斥同步屬於一種悲觀的並發策略,總是認為只要不去做正確的同步措施(例如加鎖),那就肯定會出現問題,無論共享數據是否真的會出現競爭,它都要進行加鎖(這裏討論的是概念模型,實際上虛擬機會優化掉很大一部分不必要的加鎖)、用戶態核心態轉換、維護鎖計數器和檢查是否有被阻塞的線程需要喚醒等操作。

隨著硬件指令集的發展,我們有了另外一個選擇:基於沖突檢測的樂觀並發策略,通俗地說,就是先進行操作,如果沒有其他線程爭用共享數據,那操作就成功了;如果共享數據有爭用,產生了沖突,那就再采取其他的補償措施(最常見的補償措施就是不斷地重試,直到成功為止),這種樂觀的並發策略的許多實現都不需要把線程掛起,因此這種同步操作稱為非阻塞同步(Non-Blocking Synchronization)。

樂觀鎖需要操作和沖突檢測這兩個步驟具備原子性,這裏就不能再使用互斥同步來保證了,只能靠硬件來完成。硬件支持的原子性操作最典型的是:比較並交換(Compare-and-Swap,CAS)。

CAS 指令需要有 3 個操作數,分別是內存位置(在 Java 中可以簡單理解為變量的內存地址,用 V 表示)、舊的預期值(用 A 表示)和新值(用 B 表示)。CAS 指令執行時,當且僅當 V 符合舊預期值 A 時,處理器用新值 B 更新 V 的值,否則它就不執行更新。但是無論是否更新了 V 的值,都會返回 V 的舊值,上述的處理過程是一個原子操作。

J.U.C 包裏面的整數原子類 AtomicInteger,其中的 compareAndSet() 和 getAndIncrement() 等方法都使用了 Unsafe 類的 CAS 操作。

在下面的代碼 1 中,使用了 AtomicInteger 執行了自增的操作。代碼 2 是 incrementAndGet() 的源碼,它調用了 unsafe 的 getAndAddInt() 。代碼 3 是 getAndAddInt() 源碼,var1 指示內存位置,var2 指示新值,var4 指示操作需要加的數值,這裏為 1。在代碼 3 的實現中,通過 getIntVolatile(var1, var2) 得到舊的預期值。通過調用 compareAndSwapInt() 來進行 CAS 比較,如果 var2=var5,那麽就更新內存地址為 var1 的變量為 var5+var4。可以看到代碼 3 是在一個循環中進行,發生沖突的做法是不斷的進行重試。

// 代碼 1
private AtomicInteger cnt = new AtomicInteger();

public void add() {
    cnt.incrementAndGet();
}
// 代碼 2
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
// 代碼 3
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

ABA :如果一個變量初次讀取的時候是 A 值,它的值被改成了 B,後來又被改回為 A,那 CAS 操作就會誤認為它從來沒有被改變過。J.U.C 包提供了一個帶有標記的原子引用類“AtomicStampedReference”來解決這個問題,它可以通過控制變量值的版本來保證 CAS 的正確性。大部分情況下 ABA 問題不會影響程序並發的正確性,如果需要解決 ABA 問題,改用傳統的互斥同步可能會比原子類更高效。

3. 無同步方案

要保證線程安全,並不是一定就要進行同步,兩者沒有因果關系。同步只是保證共享數據爭用時的正確性的手段,如果一個方法本來就不涉及共享數據,那它自然就無須任何同步措施去保證正確性,因此會有一些代碼天生就是線程安全的。

(一)可重入代碼(Reentrant Code)

這種代碼也叫做純代碼(Pure Code),可以在代碼執行的任何時刻中斷它,轉而去執行另外一段代碼(包括遞歸調用它本身),而在控制權返回後,原來的程序不會出現任何錯誤。相對線程安全來說,可重入性是更基本的特性,它可以保證線程安全,即所有的可重入的代碼都是線程安全的,但是並非所有的線程安全的代碼都是可重入的。

可重入代碼有一些共同的特征,例如不依賴存儲在堆上的數據和公用的系統資源、用到的狀態量都由參數中傳入、不調用非可重入的方法等。我們可以通過一個簡單的原則來判斷代碼是否具備可重入性:如果一個方法,它的返回結果是可以預測的,只要輸入了相同的數據,就都能返回相同的結果,那它就滿足可重入性的要求,當然也就是線程安全的。

(二)棧封閉

多個線程訪問同一個方法的局部變量時,不會出現線程安全問題,因為局部變量存儲在棧中,屬於線程私有的。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StackClosedExample {
    public void add100() {
        int cnt = 0;
        for (int i = 0; i < 100; i++) {
            cnt++;
        }
        System.out.println(cnt);
    }
}
public static void main(String[] args) {
    StackClosedExample example = new StackClosedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> example.add100());
    executorService.execute(() -> example.add100());
    executorService.shutdown();
}
100
100

(三)線程本地存儲(Thread Local Storage)

如果一段代碼中所需要的數據必須與其他代碼共享,那就看看這些共享數據的代碼是否能保證在同一個線程中執行。如果能保證,我們就可以把共享數據的可見範圍限制在同一個線程之內,這樣,無須同步也能保證線程之間不出現數據爭用的問題。

符合這種特點的應用並不少見,大部分使用消費隊列的架構模式(如“生產者-消費者”模式)都會將產品的消費過程盡量在一個線程中消費完,其中最重要的一個應用實例就是經典 Web 交互模型中的“一個請求對應一個服務器線程”(Thread-per-Request)的處理方式,這種處理方式的廣泛應用使得很多 Web 服務端應用都可以使用線程本地存儲來解決線程安全問題。

可以使用 java.lang.ThreadLocal 類來實現線程本地存儲功能。

對於以下代碼,thread1 中設置 threadLocal 為 1,而 thread2 設置 threadLocal 為 2。過了一段時間之後,thread1 讀取 threadLocal 依然是 1,不受 thread2 的影響。

public class ThreadLocalExample {
    public static void main(String[] args) {
        ThreadLocal threadLocal = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal.set(1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(threadLocal.get());
            threadLocal.remove();
        });
        Thread thread2 = new Thread(() -> {
            threadLocal.set(2);
            threadLocal.remove();
        });
        thread1.start();
        thread2.start();
    }
}
1

為了理解 ThreadLocal,先看以下代碼:

public class ThreadLocalExample1 {
    public static void main(String[] args) {
        ThreadLocal threadLocal1 = new ThreadLocal();
        ThreadLocal threadLocal2 = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal1.set(1);
            threadLocal2.set(1);
        });
        Thread thread2 = new Thread(() -> {
            threadLocal1.set(2);
            threadLocal2.set(2);
        });
        thread1.start();
        thread2.start();
    }
}

它所對應的底層結構圖為:

技術分享圖片


每個 Thread 都有一個 ThreadLocal.ThreadLocalMap 對象,Thread 類中就定義了 ThreadLocal.ThreadLocalMap 成員。

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

當調用一個 ThreadLocal 的 set(T value) 方法時,先得到當前線程的 ThreadLocalMap 對象,然後將 ThreadLocal->value 鍵值對插入到該 Map 中。

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

get() 方法類似。

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

ThreadLocal 從理論上講並不是用來解決多線程並發問題的,因為根本不存在多線程競爭。在一些場景 (尤其是使用線程池) 下,由於 ThreadLocal.ThreadLocalMap 的底層數據結構導致 ThreadLocal 有內存泄漏的情況,盡可能在每次使用 ThreadLocal 後手動調用 remove(),以避免出現 ThreadLocal 經典的內存泄漏甚至是造成自身業務混亂的風險。
最佳場景是這樣的:hreadLocal 適用於每個線程需要自己獨立的實例且該實例需要在多個方法中被使用,也即變量在線程間隔離而在方法或類間共享的場景。ThreadLocal的原理和使用場景

並發編程實戰-保證線程安全方式