Java執行緒(三):執行緒協作-生產者/消費者問題
上一篇講述了執行緒的互斥(同步),但是在很多情況下,僅僅同步是不夠的,還需要執行緒與執行緒協作(通訊),生產者/消費者問題是一個經典的執行緒同步以及通訊的案例。該問題描述了兩個共享固定大小緩衝區的執行緒,即所謂的“生產者”和“消費者”在實際執行時會發生的問題。生產者的主要作用是生成一定量的資料放到緩衝區中,然後重複此過程。與此同時,消費者也在緩衝區消耗這些資料。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入資料,消費者也不會在緩衝區中空時消耗資料。要解決該問題,就必須讓生產者在緩衝區滿時休眠(要麼乾脆就放棄資料),等到下次消費者消耗緩衝區中的資料的時候,生產者才能被喚醒,開始往緩衝區新增資料。同樣,也可以讓消費者在緩衝區空時進入休眠,等到生產者往緩衝區新增資料之後,再喚醒消費者,通常採用執行緒間通訊的方法解決該問題。如果解決方法不夠完善,則容易出現死鎖的情況。出現死鎖時,兩個執行緒都會陷入休眠,等待對方喚醒自己。該問題也能被推廣到多個生產者和消費者的情形。本文講述了JDK5之前傳統執行緒的通訊方式,更高階的通訊方式可參見
假設有這樣一種情況,有一個盤子,盤子裡只能放一個雞蛋,A執行緒專門往盤子裡放雞蛋,如果盤子裡有雞蛋,則一直等到盤子裡沒雞蛋,B執行緒專門從盤子裡取雞蛋,如果盤子裡沒雞蛋,則一直等到盤子裡有雞蛋。這裡盤子是一個互斥區,每次放雞蛋是互斥的,每次取雞蛋也是互斥的,A執行緒放雞蛋,如果這時B執行緒要取雞蛋,由於A沒有釋放鎖,B執行緒處於等待狀態,進入阻塞佇列,放雞蛋之後,要通知B執行緒取雞蛋,B執行緒進入就緒佇列,反過來,B執行緒取雞蛋,如果A執行緒要放雞蛋,由於B執行緒沒有釋放鎖,A執行緒處於等待狀態,進入阻塞佇列,取雞蛋之後,要通知A執行緒放雞蛋,A執行緒進入就緒佇列。我們希望當盤子裡有雞蛋時,A執行緒阻塞,B執行緒就緒,盤子裡沒雞蛋時,A執行緒就緒,B執行緒阻塞,程式碼如下:
輸出結果:import java.util.ArrayList; import java.util.List; /** 定義一個盤子類,可以放雞蛋和取雞蛋 */ public class Plate { /** 裝雞蛋的盤子 */ List<Object> eggs = new ArrayList<Object>(); /** 取雞蛋 */ public synchronized Object getEgg() { while (eggs.size() == 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Object egg = eggs.get(0); eggs.clear();// 清空盤子 notify();// 喚醒阻塞佇列的某執行緒到就緒佇列 System.out.println("拿到雞蛋"); return egg; } /** 放雞蛋 */ public synchronized void putEgg(Object egg) { while (eggs.size() > 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } eggs.add(egg);// 往盤子裡放雞蛋 notify();// 喚醒阻塞佇列的某執行緒到就緒佇列 System.out.println("放入雞蛋"); } static class AddThread implements Runnable { private Plate plate; private Object egg = new Object(); public AddThread(Plate plate) { this.plate = plate; } public void run() { plate.putEgg(egg); } } static class GetThread implements Runnable { private Plate plate; public GetThread(Plate plate) { this.plate = plate; } public void run() { plate.getEgg(); } } public static void main(String args[]) { Plate plate = new Plate(); for(int i = 0; i < 10; i++) { new Thread(new AddThread(plate)).start(); new Thread(new GetThread(plate)).start(); } } }
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
程式開始,A執行緒判斷盤子是否為空,放入一個雞蛋,並且喚醒在阻塞佇列的一個執行緒,阻塞佇列為空;假設CPU又排程了一個A執行緒,盤子非空,執行等待,這個A執行緒進入阻塞佇列;然後一個B執行緒執行,盤子非空,取走雞蛋,並喚醒阻塞佇列的A執行緒,A執行緒進入就緒佇列,此時就緒佇列就一個A執行緒,馬上執行,放入雞蛋;如果再來A執行緒重複第一步,在來B執行緒重複第二步,整個過程就是生產者(A執行緒)生產雞蛋,消費者(B執行緒)消費雞蛋。前段時間看了張孝祥老師執行緒的視訊,講述了一個其學員的面試題,也是執行緒通訊的,在此也分享一下。
題目:子執行緒迴圈10次,主執行緒迴圈100次,如此迴圈100次,好像是空中網的筆試題。
public class ThreadTest2 {
public static void main(String[] args) {
final Business business = new Business();
new Thread(new Runnable() {
@Override
public void run() {
threadExecute(business, "sub");
}
}).start();
threadExecute(business, "main");
}
public static void threadExecute(Business business, String threadType) {
for(int i = 0; i < 100; i++) {
try {
if("main".equals(threadType)) {
business.main(i);
} else {
business.sub(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Business {
private boolean bool = true;
public synchronized void main(int loop) throws InterruptedException {
while(bool) {
this.wait();
}
for(int i = 0; i < 100; i++) {
System.out.println("main thread seq of " + i + ", loop of " + loop);
}
bool = true;
this.notify();
}
public synchronized void sub(int loop) throws InterruptedException {
while(!bool) {
this.wait();
}
for(int i = 0; i < 10; i++) {
System.out.println("sub thread seq of " + i + ", loop of " + loop);
}
bool = false;
this.notify();
}
}
大家注意到沒有,在呼叫wait方法時,都是用while判斷條件的,而不是if,在wait方法說明中,也推薦使用while,因為在某些特定的情況下,執行緒有可能被假喚醒,使用while會迴圈檢測更穩妥。wait和notify方法必須工作於synchronized內部,且這兩個方法只能由鎖物件來呼叫。