1. 程式人生 > 其它 >Java併發程式設計的藝術(六)——執行緒間的通訊

Java併發程式設計的藝術(六)——執行緒間的通訊

多條執行緒之間有時需要資料互動,下面介紹五種執行緒間資料互動的方式,他們的使用場景各有不同。

1. volatile、synchronized關鍵字

PS:關於volatile的詳細介紹請移步至:Java併發程式設計的藝術(三)——volatile

1.1 如何實現通訊?

這兩種方式都採用了同步機制實現多條執行緒間的資料通訊。與其說是“通訊”,倒不如說是“共享變數”來的恰當。當一個共享變數被volatile修飾 或 被同步塊包裹後,他們的讀寫操作都會直接操作共享記憶體,從而各個執行緒都能看到共享變數最新的值,也就是實現了記憶體的可見性。

1.2 特點

  • 這種方式本質上是“共享資料”,而非“傳遞資料”;只是從結果來看,資料好像是從寫執行緒傳遞到了讀執行緒;
  • 這種通訊方式無法指定特定的接收執行緒。當資料被修改後究竟哪條執行緒最先訪問到,這由作業系統隨機決定。
  • 總的來說,這種方式並不是真正意義上的“通訊”,而是“共享”。

1.3 使用場景

這種方式能“傳遞”變數。當需要傳遞一些公用的變數時就可以使用這種方式。如:傳遞boolean flag,用於表示狀態、傳遞一個儲存所有任務的佇列等。

1.4 例子

用這種方式實現執行緒的開關控制。

// 用於控制執行緒當前的執行狀態
private volatile boolean running = false;

// 開啟一條執行緒
Thread thread = new Thread(new Runnable(){
    void run(){
        // 開關
        while(!running){
            Thread.sleep(1000);
        }
        // 執行執行緒任務
        doSometing();
    }
}).start();

// 開始執行
public void start(){
    running = true;
}

2. 等待/通知機制

2.1 如何實現?

等待/通知機制的實現由Java完成,我們只需呼叫Object類的幾個方法即可。

  • wait():將當前執行緒的狀態改為“等待態”,加入等待佇列,釋放鎖;直到當前執行緒發生中斷或呼叫了notify方法,這條執行緒才會被從等待佇列轉移到同步佇列,此時可以開始競爭鎖。
  • wait(long):和wait()功能一樣,只不過多了個超時動作。一旦超時,就會繼續執行wait之後的程式碼,它不會拋超時異常!
  • notify():將等待佇列中的一條執行緒轉移到同步佇列中去。
  • notifyAll():將等待佇列中的所有執行緒都轉移到同步佇列中去。

2.2 注意點

  • 以上方法都必須放在同步塊中;
  • 並且以上方法都只能由所處同步塊的鎖物件呼叫;
  • 鎖物件A.notify()/notifyAll()只能喚醒由鎖物件A wait的執行緒;
  • 呼叫notify/notifyAll函式後僅僅是將執行緒從等待佇列轉移到阻塞佇列,只有當該執行緒競爭到鎖後,才能從wait方法中返回,繼續執行接下來的程式碼;

2.3 QA

  • 為什麼wait必須放在同步塊中呼叫? 因為等待/通知機制需要和共享狀態變數配合使用,一般是先檢查狀態,若狀態為true則執行wait,即包含“先檢查後執行”,因此需要把這一過程加鎖,確保其原子執行。 舉個例子:
// 共享的狀態變數
boolean flag = false;

// 執行緒1
Thread t1 = new Thread(new Runnable(){
    public void run(){
        while(!flag){
            wait();
        }
    }
}).start();

// 執行緒2
Thread t2 = new Thread(new Runnable(){
    public void run(){
        flag = true;
        notifyAll();
    }
}).start();

上述例子thread1未加同步。當thread1執行到while那行後,判斷其狀態為true,此時若發生上下文切換,執行緒2開始執行,並一口氣執行完了;此時flag已經是true,然而thread1繼續執行,遇到wait後便進入等待態;但此時已經沒有執行緒能喚醒它了,因此就一直等待下去。

  • 為什麼notify需要加鎖?且必須和wait使用同一把鎖? 首先,加鎖是為了保證共享變數的記憶體可見性,讓它發生修改後能直接寫入共享記憶體,好讓wait所處的執行緒立即看見。 其次,和wait使用同一把鎖是為了確保wait、notify之間的互斥,即:同一時刻,只能有其中一條執行緒執行。
  • 為什麼必須使用同步塊的鎖物件呼叫wait函式? 首先,由於wait會釋放鎖,因此通過鎖物件呼叫wait就是告訴wait釋放哪個鎖。 其次,告訴執行緒,你是在哪個鎖物件上等待的,只有當該鎖物件呼叫notify時你才能被喚醒。
  • 為什麼必須使用同步塊的鎖物件呼叫notify函式? 告訴notify,只喚醒在該鎖物件上等待的執行緒。

2.4 程式碼實現

等待/通知機制用於實現生產者和消費者模式。

  • 生產者
synchronized(鎖A){
    flag = true;// 或者:list.add(xx);
    鎖A.notify();
}
  • 消費者
synchronized(鎖A){
    // 不滿足條件
    while(!flag){ // 或者:list.isEmpty()
        鎖A.wait();
    }

    // doSometing……
}

2.5 超時等待模式

在之前的生產者-消費者模式中,如果生產者沒有發出通知,那麼消費者將永遠等待下去。為了避免這種情況,我們可以給消費者增加超時等待功能。該功能依託於wait(long)方法,只需在wait前的檢查條件中增加超時標識位,實現如下:

public void get(long mills){
    synchronized( list ){
        // 不加超時功能
        if ( mills <= 0 ) {
            while( list.isEmpty() ){
                list.wait();
            }
        }

        // 新增超時功能
        else {
            boolean isTimeout = false;
            while(list.isEmpty() && isTimeout){
                list.wait(mills);
                isTimeout = true;
            }

            // doSometing……
        }
    }
}

3. 管道流

3.1 作用

管道流用於在兩個執行緒之間進行位元組流或字元流的傳遞。

3.2 特點

  • 管道流的實現依靠PipedOutputStream、PipedInputStream、PipedWriter、PipedReader。分別對應位元組流和字元流。
  • 他們與IO流的區別是:IO流是在硬碟、記憶體、Socket之間流動,而管道流僅在記憶體中的兩條執行緒間流動。

3.3 實現

步驟如下: 1. 在一條執行緒中分別建立輸入流和輸出流; 2. 將輸入流和輸出流連線起來; 3. 將輸入流和輸出流分別傳遞給兩條執行緒; 4. 呼叫read和write方法就可以實現執行緒間通訊。

// 建立輸入流與輸出流物件
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();

// 連線輸入輸出流
out.connect(in);

// 建立寫執行緒
class WriteThread extends Thread{
    private PipedWriter out;

    public WriteThread(PipedWriter out){
        this.out = out;
    }

    public void run(){
        out.write("hello concurrent world!");
    }
}

// 建立讀執行緒
class ReaderThread extends Thread{
    private PipedReader in;

    public ReaderThread(PipedReader in){
        this.in = in;
    }

    public void run(){
        in.read();
    }
}

// 

4. join

4.1 作用

  • join能將併發執行的多條執行緒序列執行;
  • join函式屬於Thread類,通過一個thread物件呼叫。當線上程B中執行threadA.join()時,執行緒B將會被阻塞(底層呼叫wait方法),等到threadA執行緒執行結束後才會返回join方法。
  • 被等待的那條執行緒可能會執行很長時間,因此join函式會丟擲InterruptedException。當呼叫threadA.interrupt()後,join函式就會丟擲該異常。

4.2 實現

public static void main(String[] args){

    // 開啟一條執行緒
    Thread t = new Thread(new Runnable(){
        public void run(){
            // doSometing
        }
    }).start();

    // 呼叫join,等待t執行緒執行完畢
    try{
        t.join();
    }catch(InterruptedException e){
        // 中斷處理……
    }

}