1. 程式人生 > >高併發程式設計學習(2)——執行緒通訊詳解

高併發程式設計學習(2)——執行緒通訊詳解

為獲得良好的閱讀體驗,請訪問原文: 傳送門
前序文章

  • 高併發程式設計學習(1)——併發基礎 - https://www.wmyskxz.com/2019/11/26/gao-bing-fa-bian-cheng-xue-xi-1-bing-fa-ji-chu/

一、經典的生產者消費者案例


上一篇文章我們提到一個應用可以建立多個執行緒去執行不同的任務,如果這些任務之間有著某種關係,那麼執行緒之間必須能夠通訊來協調完成工作。

生產者消費者問題(英語:Producer-consumer problem)就是典型的多執行緒同步案例,它也被稱為有限緩衝問題(英語:Bounded-buffer problem)。該問題描述了共享固定大小緩衝區的兩個執行緒——即所謂的“生產者”和“消費者”——在實際執行時會發生的問題。生產者的主要作用是生成一定量的資料放到緩衝區中,然後重複此過程。與此同時,消費者也在緩衝區消耗這些資料。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入資料,消費者也不會在緩衝區中空時消耗資料。(摘自維基百科:生產者消費者問題)

  • 注意: 生產者-消費者模式中的記憶體快取區的主要功能是資料在多執行緒間的共享,此外,通過該緩衝區,可以緩解生產者和消費者的效能差;

準備基礎程式碼:無通訊的生產者消費者

我們來自己編寫一個例子:一個生產者,一個消費者,並且讓他們讓他們使用同一個共享資源,並且我們期望的是生產者生產一條放到共享資源中,消費者就會對應地消費一條。

我們先來模擬一個簡單的共享資源物件:

public class ShareResource {

    private String name;
    private String gender;

    /**
     * 模擬生產者向共享資源物件中儲存資料
     *
     * @param name
     * @param gender
     */
    public void push(String name, String gender) {
        this.name = name;
        this.gender = gender;
    }

    /**
     * 模擬消費者從共享資源中取出資料
     */
    public void popup() {
        System.out.println(this.name + "-" + this.gender);
    }
}

然後來編寫我們的生產者,使用迴圈來交替地向共享資源中新增不同的資料:

public class Producer implements Runnable {

    private ShareResource shareResource;

    public Producer(ShareResource shareResource) {
        this.shareResource = shareResource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            if (i % 2 == 0) {
                shareResource.push("鳳姐", "女");
            } else {
                shareResource.push("張三", "男");
            }
        }
    }
}

接著讓我們的消費者不停地消費生產者產生的資料:

public class Consumer implements Runnable {

    private ShareResource shareResource;

    public Consumer(ShareResource shareResource) {
        this.shareResource = shareResource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            shareResource.popup();
        }
    }
}

然後我們寫一段測試程式碼,來看看效果:

public static void main(String[] args) {
    // 建立生產者和消費者的共享資源物件
    ShareResource shareResource = new ShareResource();
    // 啟動生產者執行緒
    new Thread(new Producer(shareResource)).start();
    // 啟動消費者執行緒
    new Thread(new Consumer(shareResource)).start();
}

我們執行發現出現了詭異的現象,所有的生產者都似乎消費到了同一條資料:

張三-男
張三-男
....以下全是張三-男....

為什麼會出現這樣的情況呢?照理說,我的生產者在交替地向共享資源中生產資料,消費者也應該交替消費才對呀..我們大膽猜測一下,會不會是因為消費者是直接迴圈了 30 次列印共享資源中的資料,而此時生產者還沒有來得及更新共享資源中的資料,消費者就已經連續列印了 30 次了,所以我們讓消費者消費的時候以及生產者生產的時候都小睡個 10 ms 來緩解消費太快 or 生產太快帶來的影響,也讓現象更明顯一些:

/**
 * 模擬生產者向共享資源物件中儲存資料
 *
 * @param name
 * @param gender
 */
public void push(String name, String gender) {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    this.name = name;
    this.gender = gender;
}

/**
 * 模擬消費者從共享資源中取出資料
 */
public void popup() {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    System.out.println(this.name + "-" + this.gender);
}

再次執行程式碼,發現了出現了以下的幾種情況:

  • 重複消費:消費者連續地出現兩次相同的消費情況(張三-男/ 張三-男);
  • 性別紊亂:消費者消費到了髒資料(張三-女/ 鳳姐-男);

分析出現問題的原因

  • 重複消費:我們先來看看重複消費的問題,當生產者生產出一條資料的時候,消費者正確地消費了一條,但是當消費者再來共享資源中消費的時候,生產者還沒有準備好新的一條資料,所以消費者就又消費到老資料了,這其中的根本原因是生產者和消費者的速率不一致。
  • 性別紊亂:再來分析第二種情況。不同於上面的情況,消費者在消費第二條資料時,生產者也正在生產新的資料,但是尷尬的是,生產者只生產了一半兒(也就是該執行完 this.name = name),也就是還沒有來得及給 gender 賦值就被消費者給取走消費了.. 造成這樣情況的根本原因是沒有保證生產者生產資料的原子性。

解決出現的問題

加鎖解決性別紊亂

我們先來解決性別紊亂,也就是原子性的問題吧,上一篇文章裡我們也提到了,對於這樣的原子性操作,解決方法也很簡單:加鎖。稍微改造一下就好了:

/**
 * 模擬生產者向共享資源物件中儲存資料
 *
 * @param name
 * @param gender
 */
synchronized public void push(String name, String gender) {
    this.name = name;
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    this.gender = gender;
}

/**
 * 模擬消費者從共享資源中取出資料
 */
synchronized public void popup() {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    System.out.println(this.name + "-" + this.gender);
}
  • 我們在方法前面都加上了 synchronized 關鍵字,來保證每一次讀取和修改都只能是一個執行緒,這是因為當 synchronized 修飾在普通同步方法上時,它會自動鎖住當前例項物件,也就是說這樣改造之後讀/ 寫操作同時只能進行其一;
  • 我把 push 方法小睡的程式碼改在了賦值 namegender 的中間,以強化驗證原子性操作是否成功,因為如果不是原子性的話,就很可能出現賦值 name 還沒賦值給 gender 就被取走的情況,小睡一會兒是為了加強這種情況的出現概率(可以試著把 synchronized 去掉看看效果);

執行程式碼後發現,並沒有出現性別紊亂的現象了,但是重複消費仍然存在。

等待喚醒機制解決重複消費

我們期望的是 張三-男鳳姐-女 交替出現,而不是有重複消費的情況,所以我們的生產者和消費者之間需要一點溝通,最容易想到的解決方法是,我們新增加一個標誌位,然後在消費者中使用 while 迴圈判斷,不滿足條件則不消費,條件滿足則退出 while 迴圈,從而完成消費者的工作。

while (value != desire) {
    Thread.sleep(10);
}
doSomething();

這樣做的目的就是為了防止「過快的無效嘗試」,這種方法看似能夠實現所需的功能,但是卻存在如下的問題:

  • 1)難以確保及時性。在睡眠時,基本不消耗處理器的資源,但是如果睡得過久,就不能及時發現條件已經變化,也就是及時性難以保證;
  • 2)難以降低開銷。如果降低睡眠的時間,比如休眠 1 毫秒,這樣消費者能夠更加迅速地發現條件變化,但是卻可能消耗更多的處理資源,造成了無端的浪費。

以上兩個問題嗎,看似矛盾難以調和,但是 Java 通過內建的等待/ 通知機制能夠很好地解決這個矛盾並實現所需的功能。

等待/ 通知機制,是指一個執行緒 A 呼叫了物件 O 的 wait() 方法進入等待狀態,而另一個執行緒 B 呼叫了物件 O 的 notifyAll() 方法,執行緒 A 收到通知後從物件 O 的 wait() 方法返回,進而執行後續操作。上述兩個執行緒都是通過物件 O 來完成互動的,而物件上的 waitnotify/ notifyAll 的關係就如同開關訊號一樣,用來完成等待方和通知方之間的互動工作。

這裡有一個比較奇怪的點是,為什麼看起來像是執行緒之間操作的 waitnotify/ notifyAll 方法會是 Object 類中的方法,而不是 Thread 類中的方法呢?

  • 簡單來說:因為 synchronized 中的這把鎖可以是任意物件,因為要滿足任意物件都能夠呼叫,所以屬於 Object 類;
  • 專業點說:因為這些方法在操作同步執行緒時,都必須要標識它們操作執行緒的鎖,只有同一個鎖上的被等待執行緒,可以被同一個鎖上的 notify 喚醒,不可以對不同鎖中的執行緒進行喚醒。也就是說,等待和喚醒必須是同一個鎖。而鎖可以是任意物件,所以可以被任意物件呼叫的方法是定義在 Object 類中。

好,簡單介紹完等待/ 通知機制,我們開始改造吧:

public class ShareResource {

    private String name;
    private String gender;
    // 新增加一個標誌位,表示共享資源是否為空,預設為 true
    private boolean isEmpty = true;

    /**
     * 模擬生產者向共享資源物件中儲存資料
     *
     * @param name
     * @param gender
     */
    synchronized public void push(String name, String gender) {
        try {
            while (!isEmpty) {
                // 當前共享資源不為空的時,則等待消費者來消費
                // 使用同步鎖物件來呼叫,表示當前執行緒釋放同步鎖,進入等待池,只能被其他執行緒所喚醒
                this.wait();
            }
            // 開始生產
            this.name = name;
            Thread.sleep(10);
            this.gender = gender;
            // 生產結束
            isEmpty = false;
            // 生產結束喚醒一個消費者來消費
            this.notify();
        } catch (Exception ignored) {
        }
    }

    /**
     * 模擬消費者從共享資源中取出資料
     */
    synchronized public void popup() {
        try {
            while (isEmpty) {
                // 為空則等著生產者進行生產
                // 使用同步鎖物件來呼叫,表示當前執行緒釋放同步鎖,進入等待池,只能被其他執行緒所喚醒
                this.wait();
            }
            // 消費開始
            Thread.sleep(10);
            System.out.println(this.name + "-" + this.gender);
            // 消費結束
            isEmpty = true;
            // 消費結束喚醒一個生產者去生產
            this.notify();
        } catch (InterruptedException ignored) {
        }
    }
}
  • 我們期望生產者生產一條,然後就去通知消費者消費一條,那麼在生產和消費之前,都需要考慮當前是否需要生產 or 消費,所以我們新增了一個標誌位來判斷,如果不滿足則等待;
  • 被通知後仍然要檢查條件,條件滿足,則執行我們相應的生產 or 消費的邏輯,然後改變條件(這裡是 isEmpty),並且通知所有等待在物件上的執行緒;
  • 注意:上面的程式碼中通知使用的 notify() 方法,這是因為例子中寫死了只有一個消費者和生產者,在實際情況中建議還是使用 notifyAll() 方法,這樣多個消費和生產者邏輯也能夠保證(可以自己試一下);

小結

通過初始版本一步步地分析問題和解決問題,我們就差不多寫出了我們經典生產者消費者的經典程式碼,但通常消費和生產的邏輯是寫在各自的消費者和生產者程式碼裡的,這裡我為了方便閱讀,把他們都抽離到了共享資源上,我們可以簡單地再來回顧一下這個消費生產和等待通知的整個過程:

以上就是關於生產者生產一條資料,消費者消費一次的過程了,涉及的一些具體細節我們下面來說。

二、執行緒間的通訊方式


等待喚醒機制的替代:Lock 和 Condition

我們從上面的中看到了 wait()notify() 方法,只能被同步監聽鎖物件來呼叫,否則就會報出 IllegalMonitorZStateException 的異常,那麼現在問題來了,我們在上一篇提到的 Lock 機制根本就沒有同步鎖了,也就是沒有自動獲取鎖和自動釋放鎖的概念,因為沒有同步鎖,也就意味著 Lock 機制不能呼叫 waitnotify 方法,我們怎麼辦呢?

好在 Java 5 中提供了 Lock 機制的同時也提供了用於 Lock 機制控制通訊的 Condition 介面,如果大家理解了上面說到的 Object.wait()Object.notify() 方法的話,那麼就能很容易地理解 Condition 物件了。

它和 wait()notify() 方法的作用是大致相同的,只不過後者是配合 synchronized 關鍵字使用的,而 Condition 是與重入鎖相關聯的。通過 Lock 介面(重入鎖就實現了這一介面)的 newCondition() 方法可以生成一個與當前重入鎖繫結的 Condition 例項。利用 Condition 物件,我們就可以讓執行緒在合適的時間等待,或者在某一個特定的時刻得到通知,繼續執行。

我們拿上面的生產者消費者來舉例,修改成 Lock 和 Condition 程式碼如下:

public class ShareResource {

    private String name;
    private String gender;
    // 新增加一個標誌位,表示共享資源是否為空,預設為 true
    private boolean isEmpty = true;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    /**
     * 模擬生產者向共享資源物件中儲存資料
     *
     * @param name
     * @param gender
     */
    public void push(String name, String gender) {
        lock.lock();
        try {
            while (!isEmpty) {
                // 當前共享資源不為空的時,則等待消費者來消費
                condition.await();
            }
            // 開始生產
            this.name = name;
            Thread.sleep(10);
            this.gender = gender;
            // 生產結束
            isEmpty = false;
            // 生產結束喚醒消費者來消費
            condition.signalAll();
        } catch (Exception ignored) {
        } finally {
            lock.unlock();
        }
    }

    /**
     * 模擬消費者從共享資源中取出資料
     */
    public void popup() {
        lock.lock();
        try {
            while (isEmpty) {
                // 為空則等著生產者進行生產
                condition.await();
            }
            // 消費開始
            Thread.sleep(10);
            System.out.println(this.name + "-" + this.gender);
            // 消費結束
            isEmpty = true;
            // 消費結束喚醒生產者去生產
            condition.signalAll();
        } catch (InterruptedException ignored) {
        } finally {
            lock.unlock();
        }
    }
}

在 JDK 內部,重入鎖和 Condition 物件被廣泛地使用,以 ArrayBlockingQueue 為例,它的 put() 方法實現如下:

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

// 建構函式,初始化鎖以及對應的 Condition 物件
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 等待佇列有足夠的空間
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 通知需要 take() 的執行緒,佇列已有資料
    notEmpty.signal();
}

同理,對應的 take() 方法實現如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 如果佇列為空,則消費者佇列要等待一個非空的訊號
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

允許多個執行緒同時訪問:訊號量(Semaphore)

以下內容摘錄 or 改編自 《實戰 Java 高併發程式設計》 3.1.3 節的內容

訊號量為多執行緒協作提供了更為強大的控制方法。廣義上說,訊號量是對鎖的擴充套件,無論是內部鎖 synchronized 還是重入鎖 ReentrantLock,一次都只允許一個執行緒訪問一個資源,而訊號量卻可以指定多個執行緒,同時訪問某一個資源。訊號量主要提供了以下建構函式:

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)        // 第二個引數可以指定是否公平

在構造訊號量物件時,必須要指定訊號量的准入數,即同時能申請多少個許可。當每個執行緒每次只申請一個許可時,這就相當於指定了同時有多少個執行緒可以訪問某一個資源。訊號量的主要邏輯如下:

public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()
  • acquire() 方法嘗試獲得一個准入的許可。若無法獲得,則執行緒會等待,直到有執行緒釋放一個許可或者當前執行緒被中斷。
  • acquireUninterruptibly() 方法和 acquire() 方法類似,但是不響應中斷。
  • tryAcquire() 嘗試獲得一個許可,如果成功則返回 true,失敗則返回 false,它不會進行等待,立即返回。
  • release() 用於線上程訪問資源結束後,釋放一個許可,以使其他等待許可的執行緒可以進行資源訪問。

在 JDK 的官方 Javadoc 中,就有一個有關訊號量使用的簡單例項,有興趣的讀者可以自行去翻閱一下,這裡給出一個更傻瓜化的例子:

public class SemapDemo implements Runnable {

    final Semaphore semaphore = new Semaphore(5);

    @Override
    public void run() {
        try {
            semaphore.acquire();
            // 模擬耗時操作
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
            semaphore.release();
        } catch (InterruptedException ignore) {
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            executorService.submit(demo);
        }
    }
}

執行程式,就會發現系統以 5 個執行緒為單位,依次輸出帶有執行緒 ID 的提示文字。

在實現上,Semaphore 藉助了執行緒同步框架 AQS(AbstractQueuedSynchornizer),同樣藉助了 AQS 來實現的是 Java 中可重入鎖的實現。AQS 的強大之處在於,你僅僅需要繼承它,然後使用它提供的 api 就可以實現任意複雜的執行緒同步方案,AQS 為我們做了大部分的同步工作,所以這裡不細說,之後再來詳細探究一下...

我等著你:Thread.join()

如果一個執行緒 A 執行了 thread.join() 方法,其含義是:當前執行緒 A 等待 thread 執行緒終止之後才從 thread.join() 返回。執行緒 Thread 除了提供 join() 方法之外,還提供了 join(long millis)join(long millis, int nanos) 兩個具備超時特性的方法。這兩個超時方法表示,如果執行緒 Thread 在給定的超時時間裡沒有終止,那麼將會從該超時方法中返回。

在下面的程式碼中,我們建立了 10 個執行緒,編號 0 ~ 9,每個執行緒呼叫前一個執行緒的 join() 方法,也就是執行緒 0 結束了,執行緒 1 才能從 join() 方法中返回,而執行緒 0 需要等待 main 執行緒結束。

public class Join {

    public static void main(String[] args) throws InterruptedException {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
            // 每個執行緒擁有前一個執行緒的引用,需要等待前一個執行緒終止,才能從等待中返回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate. ");
    }

    static class Domino implements Runnable {

        private Thread thread;

        public Domino(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException ignore) {
            }
            System.out.println(Thread.currentThread().getName() + " terminate. ");
        }
    }
}

執行程式,可以看到下列輸出:

main terminate. 
0 terminate. 
1 terminate. 
2 terminate. 
3 terminate. 
4 terminate. 
5 terminate. 
6 terminate. 
7 terminate. 
8 terminate. 
9 terminate. 

說明每個執行緒終止的前提都是前驅執行緒的終止,每個執行緒等待前驅執行緒結束後,才從 join() 方法中返回,這裡涉及了等待/ 通知機制,在 JDK 的原始碼中,我們可以看到 join() 的方法如下:

public final synchronized void join(long millis)
    throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        // 條件不滿足則繼續等待
        while (isAlive()) {
            wait(0);
        }
        // 條件符合則返回
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

當執行緒終止時,會呼叫執行緒自身的 notifyAll() 方法,會通知所有等待在該執行緒物件上的執行緒。可以看到 join() 方法的邏輯結構跟我們上面寫的生產者消費者類似,即加鎖、迴圈和處理邏輯三個步驟。

三、執行緒之間的資料互動


保證可見性:volatile 關鍵字

我們先從一個有趣的例子入手:

private static boolean isOver = false;

public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(() -> {
        while (!isOver) {
        }
        System.out.println("執行緒已感知到 isOver 置為 true,執行緒正常返回!");
    });
    thread.start();
    Thread.sleep(500);
    isOver = true;
    System.out.println("isOver 已置為 true");
}

我們開啟了一個主執行緒和一個子執行緒,我們期望子執行緒能夠感知到 isOver 變數的變化以結束掉死迴圈正常返回,但是執行程式卻發現並不是像我們期望的那樣發生,子執行緒一直處在了死迴圈的狀態!

為什麼會這樣呢?

Java 記憶體模型

關於這一點,我們有幾點需要說明,首先需要搞懂 Java 的記憶體模型:

Java 虛擬機器規範中試圖定義一種 Java 記憶體模型(Java Memory Model, JMM)來遮蔽掉各層硬體和作業系統的記憶體訪問差異,以實現讓 Java 程式在各種平臺下都能達到一致的記憶體訪問效果。

Java 記憶體模型規定了所有的變數都儲存在主記憶體(Main Memory)中。每條執行緒還有自己的工作記憶體(Working Memory),執行緒的工作記憶體中儲存了被該執行緒使用到的變數的主記憶體副本拷貝,執行緒對變數的所有操作(讀取、賦值等)都必須在主記憶體中進行,而不能直接讀寫主記憶體中的變數。不同的執行緒之間也無法直接訪問對方工作記憶體中的變數,執行緒間的變數值的傳遞均需要通過主記憶體來完成,執行緒、主記憶體、工作記憶體三者的關係如上圖。

那麼不同的執行緒之間是如何通訊的呢?

在共享記憶體的併發模型裡,執行緒之間共享程式的公共狀態,執行緒之間通過寫-讀記憶體中的公共狀態來隱式進行通訊,典型的共享記憶體通訊方式就是通過共享物件進行通訊。

例如上圖執行緒 A 與 執行緒 B 之間如果要通訊的話,那麼就必須經歷下面兩個步驟:

  1. 首先,執行緒 A 把本地記憶體 A 更新過的共享變數重新整理到主記憶體中去
  2. 然後,執行緒 B 到主記憶體中去讀取執行緒 A 之前更新過的共享變數

在訊息傳遞的併發模型裡,執行緒之間沒有公共狀態,執行緒之間必須通過明確的傳送訊息來顯式進行通訊,在 Java 中典型的訊息傳遞方式就是 wait()notify()

說回剛才出現的問題,就很容易理解了:每個執行緒都有獨佔的記憶體區域,如操作棧、本地變量表等。執行緒本地儲存了引用變數在堆記憶體中的副本,執行緒對變數的所有操作都在本地記憶體區域中進行,執行結束後再同步到堆記憶體中去。也就是說,我們在主執行緒中修改的 isOver 的值並沒有被子執行緒讀取到(沒有被刷入主記憶體),也就造成了子執行緒對於 isOver 變數不可見。

解決方法也很簡單,只需要在 isOver 變數前加入 volatile 關鍵字就可以了,這是因為加入了 volatile 修飾的變數允許直接與主記憶體互動,進行讀寫操作,保證可見性。

指令重排/ happen-before 原則

再從另一個有趣的例子中入手,這是在高併發場景下會存在的問題:

class LazyInitDemo {
    private static TransationService service = null;
    
    public static TransationService getTransationService(){
        if (service == null) {
            synchronized (this) {
                if (service == null) {
                    service = new TransationService();
                }
            }
        }
    }
}

這是一個典型的雙重檢查鎖定思想,這段程式碼也是一個典型的雙重檢查鎖定(Double-checked Locking)問題。在高併發的情況下,該物件引用在沒有同步的情況下進行讀寫操作,導致使用者可能會獲取未構造完成的物件。

這是因為指令優化的結果。計算機不會根據程式碼順序按部就班地執行相關指令,我們來舉一個借書的例子:假如你要去還書並且想要借一個《高併發程式設計學習》系列叢書,而你的室友恰好也要還書,並且還想讓你幫忙借一本《Java 從入門到放棄》。

這件事乍一看有兩件事:你的事和你室友的事。先辦完你的事,再開始處理你室友的事情是屬於單執行緒的死板行為,此時你會潛意識地進行「優化」,例如你可以把你要還的書和你室友需要還的書一起還了,再一起把想要借的書借出來,這其實就相當於合併資料進行存取的操作過程了。

我們知道一條指令的執行是可以分成很多步驟的,簡單地說,可以分為:

  • 取值 IF
  • 譯碼和去暫存器運算元 ID
  • 執行或者有效地址計算 EX
  • 儲存器訪問 MEM
  • 寫回 WB

由於每一個步驟可能使用不同的硬體完成,因此,聰明的工程師就發明了流水線技術來執行指令,如下圖所示:

可以看到,當第 2 條指令執行時,第 1 條執行其實並沒有執行完,確切地說第一條指令還沒有開始執行,只是剛剛完成了取值操作而已。這樣的好處非常明顯,假如這裡每一個步驟都需要花費 1 毫秒,那麼指令 2 等待指令 1 完全執行後再執行,則需要等待 5 毫秒,而使用流水線指令,指令 2 只需要等待 1 毫秒就可以執行了。如此大的效能提升,當然讓人眼紅。

回到最初的問題,我們分析一下:對於 Java 編譯器來說,初始化 TransactionService 例項和將物件地址寫到 service 欄位並非原子操作,且這兩個階段的執行順序是未定義的。加入某個執行緒執行 new TransactionService() 時,構造方法還未被呼叫,編譯器僅僅為該物件分配了記憶體空間並設為預設值,此時若另一個執行緒呼叫 getTransactionService() 方法,由於 service != null,但是此時 service 物件還沒有被賦予真正的有效值,從而無法取到正確的 service 單例物件。

對於此問題,一種較為簡單的解決方案就是用 volatile 關鍵字修飾目標屬性(適用於 JDK5 及以上版本),這樣 service 就限制了編譯器對它的相關讀寫操作,對它的讀寫操作進行指令重排,確定物件例項化之後才返回引用。

另外指令重排也有自己的規則,並非所有的指令都可以隨意改變執行位置,下面列舉一下基本的原則:

  • 程式次序規則:一個執行緒內,按照程式碼順序,書寫在前面的操作先行發生於書寫在後面的操作;
  • 鎖定規則:一個 unLock 操作先行發生於後面對同一個鎖的 lock 操作;
  • volatile 變數規則:對一個變數的寫操作先行發生於後面對這個變數的讀操作;
  • 傳遞規則:如果操作 A 先行發生於操作 B,而操作 B 又先行發生於操作 C,則可以得出操作 A 先行發生於操作 C;
  • 執行緒啟動規則:Thread 物件的 start() 方法先行發生於此執行緒的每個一個動作;
  • 執行緒中斷規則:對執行緒 interrupt() 方法的呼叫先行發生於被中斷執行緒的程式碼檢測到中斷事件的發生;
  • 執行緒終結規則:執行緒中所有的操作都先行發生於執行緒的終止檢測,我們可以通過 Thread.join() 方法結束、Thread.isAlive() 的返回值手段檢測到執行緒已經終止執行;
  • 物件終結規則:一個物件的初始化完成先行發生於他的 finalize() 方法的開始;

volatile 不保證原子性

volatile 解決的是多執行緒共享變數的可見性問題,類似於 synchronized,但不具備 synchronized 的互斥性。所以對 volatile 變數的操作並非都具有原子性,例如我們用下面的例子來說明:

public class VolatileNotAtomic {

    private static volatile long count = 0L;
    private static final int NUMBER = 10000;

    public static void main(String[] args) {
        Thread subtractThread = new SubstractThread();
        subtractThread.start();

        for (int i = 0; i < NUMBER; i++) {
            count++;
        }

        // 等待減法執行緒結束
        while (subtractThread.isAlive()) {
        }

        System.out.println("count 最後的值為: " + count);
    }

    private static class SubstractThread extends Thread {

        @Override
        public void run() {
            for (int i = 0; i < NUMBER; i++) {
                count--;
            }
        }
    }
}

多次執行後,發現結果基本都不為 0。只有在 count++count-- 兩處都進行加鎖時,才能正確的返回 0,瞭解 Java 的童鞋都應該知道這 count++count-- 都不是一個原子操作,這裡就不作說明了。

volatile 的使用優化

在瞭解一點吧,註明的併發程式設計大師 Doug lea 在 JDK 7 的併發包裡新增一個佇列集合類 LinkedTransferQueue,它在使用 volatile 變數時,用一種追加位元組的方式來優化對列出隊和入隊的效能,具體的可以看一下下列的連結,這裡就不具體說明了。

  • 追加位元組方式來優化佇列效能? - https://my.oschina.net/u/3694754/blog/2990652

保證原子性:synchronized

Java 中任何一個物件都有一個唯一與之關聯的鎖,這樣的鎖作為該物件的一系列標誌位儲存在物件資訊的頭部。Java 物件頭裡的 Mark Word 裡預設的存放的物件的 Hashcode/ 分代年齡和鎖標記位。32 為JVM Mark Word 預設儲存結構如下:

Java SE 1.6中,鎖一共有 4 種狀態,級別從低到高依次是:無鎖狀態、偏向鎖狀態、輕量級鎖狀態和重量級鎖狀態,這幾個狀態會隨著競爭情況逐漸升級。鎖可以升級但不能降級,意味著偏向鎖升級成輕量級鎖後不能降級成偏向鎖。這種鎖升級卻不能降級的策略,目的是為了提高獲得鎖和釋放鎖的效率。

偏向鎖

HotSpot 的作者經過研究發現,大多數情況下,鎖不僅不存在多執行緒競爭,而且總是由同一執行緒多次獲得,為了讓執行緒獲得鎖的代價更低而引入了偏向鎖。

  • 偏向鎖的獲取:當一個執行緒訪問同步塊並獲取鎖時,會在物件頭和棧幀中的鎖記錄裡儲存鎖偏向的執行緒 ID,以後該執行緒在進入和退出同步塊時不需要進行 CAS 操作來加鎖和解鎖,只需簡單地測試一下物件頭的 Mark Word 裡是否儲存著指向當前執行緒的偏向鎖。如果測試成功,表示執行緒已經獲得了鎖。如果測試失敗,則需要再測試一下 Mark Word 中偏向鎖的標識是否設定成 1(表示當前是偏向鎖),如果沒有設定,則使用CAS競爭鎖;如果設定了,則嘗試使用CAS將物件頭的偏向鎖指向當前執行緒。

  • 偏向鎖的撤銷:偏向鎖使用了一種等到競爭出現才釋放鎖的機制,所以當其他執行緒嘗試競爭偏向鎖時,持有偏向鎖的執行緒才會釋放鎖。

下圖執行緒 1 展示了偏向鎖獲取的過程,執行緒 2 展示了偏向鎖撤銷的過程。

輕量級鎖和自旋鎖

如果偏向鎖失敗,虛擬機器並不會立即掛起執行緒。它還會使用一種稱為輕量級鎖的優化手段。

執行緒在執行同步塊之前,JVM 會先在當前執行緒的棧楨中建立用於儲存鎖記錄的空間,並將物件頭中的 Mark Word 複製到鎖記錄中,官方稱為 Displaced Mark Word。然後執行緒嘗試使用 CAS 將物件頭中的 Mark Word 替換為指向鎖記錄的指標。如果成功,當前執行緒獲得鎖,如果失敗,表示其他執行緒競爭鎖,當前執行緒便嘗試使用自旋(自己執行幾個空迴圈再進行嘗試)來獲取鎖。

輕量級解鎖時,會使用原子的 CAS 操作將 Displaced Mark Word 替換回到物件頭,如果成功,則表示沒有競爭發生。如果失敗,表示當前鎖存在競爭,鎖就會膨脹成重量級鎖。下圖是兩個執行緒同時爭奪鎖,導致鎖膨脹的流程圖。

幾種鎖的比較

下圖就簡單概括了一下幾種鎖的比較:

每人一支筆:ThreadLocal

除了控制資源的訪問外,我們還可以通過增加資源來保證所有物件的執行緒安全。比如,讓 100 個人填寫個人資訊表,如果只有一支筆,那麼大家就得挨個寫,對於管理人員來說,必須保證大家不會去哄搶這僅存的一支筆,否則,誰也填不完。從另外一個角度出發,我們可以乾脆就準備 100 支筆,那麼所有人都可以各自為營,很快就能完成表格的填寫工作。

如果說鎖是使用第一種思路,那麼 ThreadLocal 就是使用第二種思路了。

當使用 ThreadLocal 維護變數時,其為每個使用該變數的執行緒提供獨立的變數副本,所以每一個執行緒都可以獨立的改變自己的副本,而不會影響其他執行緒對應的副本。

ThreadLocal 內部實現機制:

  1. 每個執行緒內部都會維護一個類似 HashMap 的物件,稱為 ThreadLocalMap,裡邊會包含若干了 Entry(K-V 鍵值對),相應的執行緒被稱為這些 Entry 的屬主執行緒;

  2. Entry 的 Key 是一個 ThreadLocal 例項,Value 是一個執行緒特有物件。Entry 的作用即是:為其屬主執行緒建立起一個 ThreadLocal 例項與一個執行緒特有物件之間的對應關係;

  3. Entry 對 Key 的引用是弱引用;Entry 對 Value 的引用是強引用。

ThreadLodal 的副作用

為了讓執行緒安全地共享某個變數,JDK 開出了 ThreadLocal 這副藥方,但「是藥三分毒」,ThreadLocal 也有一定的副作用。主要問題是「產生髒資料」和「記憶體洩漏」。這兩個問題通常是線上程池中使用 ThreadLocal 引發的,因為執行緒池有 「執行緒複用」 和 「記憶體常駐」 兩個特點。

髒資料

執行緒複用會產生髒資料。由於執行緒池會重用 Thread 物件,那麼與 Thread 繫結的類的靜態屬性 ThreadLocal 變數也會被重用。如果在實現的執行緒 run() 方法中不顯式地 remove() 清理與執行緒相關的 ThreadLocal 資訊,那麼倘若下一個執行緒不呼叫 set() 設定初始值,就可能 get() 到重用的執行緒資訊,包括 ThreadLocal 所關聯的執行緒物件的 value 值。

為了方便理解,用一段簡要程式碼來模擬,如下所示:

public class DirtyDataInThreadLocal {

    public static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        // 使用固定大小為 1 的執行緒池,說明上一個的執行緒屬性會被下一個執行緒屬性複用
        ExecutorService pool = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 2; i++) {
            Mythread mythread = new Mythread();
            pool.execute(mythread);
        }
    }

    private static class Mythread extends Thread {

        private static boolean flag = true;

        @Override
        public void run() {
            if (flag) {
                // 第 1 個執行緒 set 後,並沒有進行 remove
                // 而第二個執行緒由於某種原因沒有進行 set 操作
                threadLocal.set(this.getName() + ", session info.");
                flag = false;
            }
            System.out.println(this.getName() + " 執行緒是 " + threadLocal.get());
        }
    }
}

執行結果:

Thread-0 執行緒是 Thread-0, session info.
Thread-1 執行緒是 Thread-0, session info.

記憶體洩漏

在原始碼註釋中提示使用 static 關鍵字來修飾 ThreadLocal。在此場景下,寄希望於 ThreadLocal 物件失去引用後,觸發弱引用機制來回收 Entry 的 Value 就變得不現實了。在上面的例子中,如果不進行 remove() 操作,那麼這個執行緒執行完成後,通過 ThreadLocal 物件持有的 String 物件是不會被釋放的。

以上兩個問題的解決辦法很簡單,就是在每次使用完 ThreadLocal 時,必須要及時呼叫 remove() 方法清理。

參考資料


  1. 《Java 零基礎入門教程》 - http://study.163.com/course/courseMain.htm?courseId=1003108028
  2. 《Java 併發程式設計的藝術》
  3. 《碼出高效 Java 開發手冊》 - 楊冠寶(孤盡) 高海慧(鳴莎)著
  4. Java面試知識點解析(二)——高併發程式設計篇 - https://www.wmyskxz.com/2018/05/10/java-mian-shi-zhi-shi-dian-jie-xi-er-gao-bing-fa-bian-cheng-pian/
  5. 讓你徹底理解Synchronized - https://www.jianshu.com/p/d53bf830fa09
  6. 《Offer來了 - Java面試核心知識點精講》 - 王磊 編著
  7. 《實戰Java高併發程式設計》 - 葛一鳴 郭超 編著

按照慣例黏一個尾巴:

歡迎轉載,轉載請註明出處!
獨立域名部落格:wmyskxz.com
簡書 ID:@我沒有三顆心臟
github:wmyskxz
歡迎關注公眾微訊號:wmyskxz
分享自己的學習 & 學習資料 & 生活
想要交流的朋友也可以加 qq 群:3382693