1. 程式人生 > >Java執行緒(三):執行緒協作-生產者/消費者問題

Java執行緒(三):執行緒協作-生產者/消費者問題

        上一篇講述了執行緒的互斥(同步),但是在很多情況下,僅僅同步是不夠的,還需要執行緒與執行緒協作(通訊),生產者/消費者問題是一個經典的執行緒同步以及通訊的案例。該問題描述了兩個共享固定大小緩衝區的執行緒,即所謂的“生產者”和“消費者”在實際執行時會發生的問題。生產者的主要作用是生成一定量的資料放到緩衝區中,然後重複此過程。與此同時,消費者也在緩衝區消耗這些資料。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入資料,消費者也不會在緩衝區中空時消耗資料。要解決該問題,就必須讓生產者在緩衝區滿時休眠(要麼乾脆就放棄資料),等到下次消費者消耗緩衝區中的資料的時候,生產者才能被喚醒,開始往緩衝區新增資料。同樣,也可以讓消費者在緩衝區空時進入休眠,等到生產者往緩衝區新增資料之後,再喚醒消費者,通常採用執行緒間通訊的方法解決該問題。如果解決方法不夠完善,則容易出現死鎖的情況。出現死鎖時,兩個執行緒都會陷入休眠,等待對方喚醒自己。該問題也能被推廣到多個生產者和消費者的情形。本文講述了JDK5之前傳統執行緒的通訊方式,更高階的通訊方式可參見

Java執行緒(九):Condition-執行緒通訊更高效的方式Java執行緒(篇外篇):阻塞佇列BlockingQueue

        假設有這樣一種情況,有一個盤子,盤子裡只能放一個雞蛋,A執行緒專門往盤子裡放雞蛋,如果盤子裡有雞蛋,則一直等到盤子裡沒雞蛋,B執行緒專門從盤子裡取雞蛋,如果盤子裡沒雞蛋,則一直等到盤子裡有雞蛋。這裡盤子是一個互斥區,每次放雞蛋是互斥的,每次取雞蛋也是互斥的,A執行緒放雞蛋,如果這時B執行緒要取雞蛋,由於A沒有釋放鎖,B執行緒處於等待狀態,進入阻塞佇列,放雞蛋之後,要通知B執行緒取雞蛋,B執行緒進入就緒佇列,反過來,B執行緒取雞蛋,如果A執行緒要放雞蛋,由於B執行緒沒有釋放鎖,A執行緒處於等待狀態,進入阻塞佇列,取雞蛋之後,要通知A執行緒放雞蛋,A執行緒進入就緒佇列。我們希望當盤子裡有雞蛋時,A執行緒阻塞,B執行緒就緒,盤子裡沒雞蛋時,A執行緒就緒,B執行緒阻塞,程式碼如下:

import java.util.ArrayList;
import java.util.List;
/** 定義一個盤子類,可以放雞蛋和取雞蛋 */
public class Plate {
	/** 裝雞蛋的盤子 */
	List<Object> eggs = new ArrayList<Object>();
	/** 取雞蛋 */
	public synchronized Object getEgg() {
		while (eggs.size() == 0) {
			try {
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		Object egg = eggs.get(0);
		eggs.clear();// 清空盤子
		notify();// 喚醒阻塞佇列的某執行緒到就緒佇列
		System.out.println("拿到雞蛋");
		return egg;
	}
	/** 放雞蛋 */
	public synchronized void putEgg(Object egg) {
		while (eggs.size() > 0) {
			try {
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		eggs.add(egg);// 往盤子裡放雞蛋
		notify();// 喚醒阻塞佇列的某執行緒到就緒佇列
		System.out.println("放入雞蛋");
	}
	static class AddThread implements Runnable  {
		private Plate plate;
		private Object egg = new Object();
		public AddThread(Plate plate) {
			this.plate = plate;
		}
		public void run() {
			plate.putEgg(egg);
		}
	}
	static class GetThread implements Runnable  {
		private Plate plate;
		public GetThread(Plate plate) {
			this.plate = plate;
		}
		public void run() {
			plate.getEgg();
		}
	}
	public static void main(String args[]) {
		Plate plate = new Plate();
		for(int i = 0; i < 10; i++) {
			new Thread(new AddThread(plate)).start();
			new Thread(new GetThread(plate)).start();
		}
	}
}
        輸出結果:
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
放入雞蛋
拿到雞蛋
        程式開始,A執行緒判斷盤子是否為空,放入一個雞蛋,並且喚醒在阻塞佇列的一個執行緒,阻塞佇列為空;假設CPU又排程了一個A執行緒,盤子非空,執行等待,這個A執行緒進入阻塞佇列;然後一個B執行緒執行,盤子非空,取走雞蛋,並喚醒阻塞佇列的A執行緒,A執行緒進入就緒佇列,此時就緒佇列就一個A執行緒,馬上執行,放入雞蛋;如果再來A執行緒重複第一步,在來B執行緒重複第二步,整個過程就是生產者(A執行緒)生產雞蛋,消費者(B執行緒)消費雞蛋。

        前段時間看了張孝祥老師執行緒的視訊,講述了一個其學員的面試題,也是執行緒通訊的,在此也分享一下。

        題目:子執行緒迴圈10次,主執行緒迴圈100次,如此迴圈100次,好像是空中網的筆試題。

public class ThreadTest2 {
	public static void main(String[] args) {
		final Business business = new Business();
		new Thread(new Runnable() {
			@Override
			public void run() {
				threadExecute(business, "sub");
			}
		}).start();
		threadExecute(business, "main");
	}	
	public static void threadExecute(Business business, String threadType) {
		for(int i = 0; i < 100; i++) {
			try {
				if("main".equals(threadType)) {
					business.main(i);
				} else {
					business.sub(i);
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
class Business {
	private boolean bool = true;
	public synchronized void main(int loop) throws InterruptedException {
		while(bool) {
			this.wait();
		}
		for(int i = 0; i < 100; i++) {
			System.out.println("main thread seq of " + i + ", loop of " + loop);
		}
		bool = true;
		this.notify();
	}	
	public synchronized void sub(int loop) throws InterruptedException {
		while(!bool) {
			this.wait();
		}
		for(int i = 0; i < 10; i++) {
			System.out.println("sub thread seq of " + i + ", loop of " + loop);
		}
		bool = false;
		this.notify();
	}
}

       大家注意到沒有,在呼叫wait方法時,都是用while判斷條件的,而不是if,在wait方法說明中,也推薦使用while,因為在某些特定的情況下,執行緒有可能被假喚醒,使用while會迴圈檢測更穩妥。wait和notify方法必須工作於synchronized內部,且這兩個方法只能由鎖物件來呼叫。