1. 程式人生 > 實用技巧 >es 的資料操作

es 的資料操作

執行緒通訊

學習材料來源於網路
如有侵權,聯絡刪除

要想實現多個執行緒之間的協同,如:執行緒執行先後順序、獲取某個執行緒執行的結果等等。

涉及到執行緒之間相互通訊,分為下面四類:

  • 檔案共享
  • 網路共享
  • 共享變數
  • jdk提供的執行緒協調API

JDK提供的執行緒執行緒通訊API: suspend()/resume()、wait()/notify()、park()/unpark()。

檔案共享

檔案共享主要使用檔案系統的讀寫分離機制來實現執行緒通訊。


    public static void main(String args){
        //執行緒1-寫入資料
        new Thread(() ->{try {
            while (true){
                Files.write(Paths.get("Demo7.log"),
                        ("當前時間" + String.valueOf(System.currentTimeMillis())).getBytes());
                Thread.sleep(1000L);
            }
        } catch (Exception e){e.printStackTrace();}
        }).start();
    //執行緒2-讀取資料new 
    new Thread(()->{try {
            while (true){
                Thread.sleep(1000L);
                byte[] allBytes= Files.readAllBytes(Paths.get("Demo7.log"));
                System.out.println(new String(allBytes));
            }
        } catch (Exception e){e.printStackTrace();}
        }).start();
    }

變數共享

//共享變數
public static String content="空";
public static void main(String args){ //執行緒1-寫入資料
    new Thread(()-> {
        try {
            while (true) {
                content = "當前時間" + String.valueOf(System.currentTimeMillis());
                Thread.sleep(1000L);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();
    //執行緒2-讀取資料
    new Thread(() ->{
        try {
            while (true){
                Thread.sleep(1000L);
                System.out.println(content);
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }).start();
}

JDK執行緒協作API

JDK中對於需要多執行緒協作完成某一任務的場景,提供了對應API支援。多執行緒協作的典型場景是:生產者-消費者模型。(執行緒阻塞、執行緒喚醒)

示例:執行緒1去買包子,沒有包子,則不再執行。執行緒-2生產出包子,通知執行緒-1繼續執行。

suspend()與resume()

示例:

/** 包子店 */
public static Object baozidian = null;

/** 正常的suspend/resume */
public void suspendResumeTest() throws Exception {
    // 啟動執行緒
    Thread consumerThread = new Thread(() -> {
        if (baozidian == null) { // 如果沒包子,則進入等待
            System.out.println("1、進入等待");
            Thread.currentThread().suspend();
        }
        System.out.println("2、買到包子,回家");
    });
    consumerThread.start();
    // 3秒之後,生產一個包子
    Thread.sleep(3000L);
    baozidian = new Object();
    consumerThread.resume();
    System.out.println("3、通知消費者");
}

死鎖實列:

/** 死鎖的suspend/resume。 suspend並不會像wait一樣釋放鎖,故此容易寫出死鎖程式碼 */
public void suspendResumeDeadLockTest() throws Exception {
    // 啟動執行緒
    Thread consumerThread = new Thread(() -> {
        if (baozidian == null) { // 如果沒包子,則進入等待
            System.out.println("1、進入等待");
            // 當前執行緒拿到鎖,然後掛起
            synchronized (this) {
                Thread.currentThread().suspend();
            }
        }
        System.out.println("2、買到包子,回家");
    });
    consumerThread.start();
    // 3秒之後,生產一個包子
    Thread.sleep(3000L);
    baozidian = new Object();
    // 爭取到鎖以後,再恢復consumerThread
    synchronized (this) {
        consumerThread.resume();
    }
    System.out.println("3、通知消費者");
}

說明:

消費者拿到當前鎖this後,處於掛起狀態,其他執行緒想搶到鎖後才能喚醒,這種業務邏輯就會出現死鎖情況,因為執行緒 suspend()的時候是不會釋放當前的鎖的。

通知的先後順序也會使得執行緒進入死鎖狀態:

/** 導致程式永久掛起的suspend/resume */
public void suspendResumeDeadLockTest2() throws Exception {
    // 啟動執行緒
    Thread consumerThread = new Thread(() -> {
        if (baozidian == null) {
            System.out.println("1、沒包子,進入等待");
            try { // 為這個執行緒加上一點延時
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 這裡的掛起執行在resume後面
            Thread.currentThread().suspend();
        }
        System.out.println("2、買到包子,回家");
    });
    consumerThread.start();
    // 3秒之後,生產一個包子
    Thread.sleep(3000L);
    baozidian = new Object();
    consumerThread.resume();
    System.out.println("3、通知消費者");
    consumerThread.join();
}

執行結果

consumerThread執行的時候,main執行緒先處於暫停3秒狀態,consumerThread執行緒判斷沒有包子,等待5秒,在consumerThread暫停5秒的時候,這個時候在主執行緒結束暫停後,通知consumerThread執行緒喚醒,但是consumerThread執行緒還在暫停時間裡面,沒有進入休眠,所以不會被喚醒,等consumerThread執行緒停止5秒時間過後,進入休眠,這個時候main執行緒已經喚醒過了。所以consumerThread執行緒會一直處於休眠狀態。

wait()與notify()

這些方法只能由同一物件鎖的持有者執行緒呼叫,也就是寫在同步塊裡面,否則會丟擲lllegalMonitorStateException異常。

wait方法導致當前執行緒等待,加入該物件的等待集合中,並且放棄當前持有的物件鎖。

notify/notifyAll方法喚醒一個或所有正在等待這個物件鎖的執行緒。

注意∶雖然會wait自動解鎖,但是對順序有要求,如果在notify被呼叫之後,才開始wait方法的呼叫,執行緒會永遠處於WAITING狀態。

示例:

 /** 正常的wait/notify */
public void waitNotifyTest() throws Exception {
    // 啟動執行緒
    new Thread(() -> {
        if (baozidian == null) { // 如果沒包子,則進入等待
            synchronized (this) {
                try {
                    System.out.println("1、進入等待");
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        System.out.println("2、買到包子,回家");
    }).start();
    // 3秒之後,生產一個包子
    Thread.sleep(3000L);
    baozidian = new Object();
    synchronized (this) {
        this.notifyAll();
        System.out.println("3、通知消費者");
    }
}

需要明白的是使用wait的時候,執行緒會把鎖釋放

先後順序造成死鎖的問題

/** 會導致程式永久等待的wait/notify */
public void waitNotifyDeadLockTest() throws Exception {
    // 啟動執行緒
    new Thread(() -> {
        if (baozidian == null) { // 如果沒包子,則進入等待
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            synchronized (this) {
                try {
                    System.out.println("1、進入等待");
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        System.out.println("2、買到包子,回家");
    }).start();
    // 3秒之後,生產一個包子
    Thread.sleep(3000L);
    baozidian = new Object();
    synchronized (this) {
        this.notifyAll();
        System.out.println("3、通知消費者");
    }
}

說明

這種問題的死鎖是因為先後呼叫順序

park()與unpark()

不要求park和unpark方法的呼叫順序。

多次呼叫unpark之後,再呼叫park,執行緒會直接執行。但不會疊加,也就是說,連續多次呼叫park方法,第一次會拿到“許可”直接執行,後續呼叫會進入等待。

執行緒呼叫park則等待“許可”,unpark方法為指定執行緒提供“許可(permit)”。

示例:

/** 正常的park/unpark */
public void parkUnparkTest() throws Exception {
    // 啟動執行緒
    Thread consumerThread = new Thread(() -> {
        if (baozidian == null) { // 如果沒包子,則進入等待
            System.out.println("1、進入等待");
            LockSupport.park();
        }
        System.out.println("2、買到包子,回家");
    });
    consumerThread.start();
    // 3秒之後,生產一個包子
    Thread.sleep(3000L);
    baozidian = new Object();
    LockSupport.unpark(consumerThread);
    System.out.println("3、通知消費者");
}

執行結果

死鎖示例:

/** 死鎖的park/unpark */
public void parkUnparkDeadLockTest() throws Exception {
    // 啟動執行緒
    Thread consumerThread = new Thread(() -> {
        if (baozidian == null) { // 如果沒包子,則進入等待
            System.out.println("1、進入等待");
            // 當前執行緒拿到鎖,然後掛起
            synchronized (this) {
                LockSupport.park();
            }
        }
        System.out.println("2、買到包子,回家");
    });
    consumerThread.start();
    // 3秒之後,生產一個包子
    Thread.sleep(3000L);
    baozidian = new Object();
    // 爭取到鎖以後,再恢復consumerThread
    synchronized (this) {
        LockSupport.unpark(consumerThread);
    }
    System.out.println("3、通知消費者");
}

執行結果:

這種死鎖的問題是因為執行緒在park()的時候,不會釋放當前的鎖。

偽喚醒

警告!之前程式碼中用if語句來判斷,是否進入等待狀態,是錯誤的!

官方建議應該在迴圈中檢查等待條件,原因是處於等待狀態的執行緒可能會收到錯誤警報和偽喚醒,如果不在迴圈中檢查等待條件,程式就會在沒有滿足結束條件的情況下退出。

偽喚醒是指執行緒並非因為notifynotifyallunpark等api呼叫而喚醒,是更底層原因導致的。

// wait
synchronized (obj){
    while (<條件判斷>){
        obj.wait();
        //執行後續操作
    }
}
// park
while(<條件判斷>){
    LockSupport.park();
    //執行後續操作
}