1. 程式人生 > >Java多執行緒之執行緒併發庫條件阻塞Condition的應用

Java多執行緒之執行緒併發庫條件阻塞Condition的應用

鎖(Lock/synchronized)只能實現互斥不能實現通訊,Condition的功能類似於在傳統的執行緒技術中的,Object.wait()和Object.notify()的功能,在等待Condition時,允許發生"虛假喚醒",這通常作為對基礎平臺語義的讓步,對於大多數應用程式,這帶來的實際影響很小,因為Condition應該總是在一個迴圈中被等待,並測試正被等待的狀態宣告.某個實現可以隨意移除可能的虛假喚醒,但是建議程式設計師總是假定這些虛假喚醒可能發生,因此總是在一個迴圈中等待.
一個鎖內部可以有多個Condition,即有多路等待和通知,可以參看jdk1.5提供的Lock與Condition實現的可阻塞佇列的應用案例,從中要體味演算法,還要體味面向物件的封裝.在傳統的執行緒機制中,一個監視器物件上只能有一路等待和通知,要想實現多路等待和通知,必須巢狀使用多個同步監視器物件.(如果只用一個Condition,兩個放的都在等,一旦一個放進去了,那麼它會通知可能會導致另一個放的接著往下走).
我們也可以通過Lock和Condition來實現上面我們講的例子:
子執行緒迴圈10次,接著主執行緒迴圈100,接著又回到子執行緒迴圈10次,接著再回到主執行緒又迴圈100,如此迴圈50次,請寫出程式。

package javaplay.thread.test;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionCommunication {
	public static void main(String[] args) {
		final Business business = new Business();
		new Thread(new Runnable() {

			@Override
			public void run() {
				for (int i = 0; i < 5; i++) {
					business.sub(i);
				}

			}

		}).start();

		for (int i = 0; i < 5; i++) {
			business.main(i);// 此處不能是sub 否則死鎖
		}

	}

	static class Business {
		// 使用lock與原來的synchronized幾乎一模一樣的
		// 思想和套路是一樣的 不在乎用得是什麼新技術
		// 所以程式設計師如果崇拜哪種語言或者技術都是很可笑的一件事
		Lock lock = new ReentrantLock();
		Condition condition = lock.newCondition();
		private boolean bShouldSub = true;// volatile???

		public void sub(int i) {
			lock.lock();
			try {
				while (!bShouldSub) {
					try {
						// this.wait();
						condition.await();// 不是wait wait是任何物件都有的,繼承自Object
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
				for (int j = 0; j < 1; j++) {
					System.out.println("sub thread sequence of " + j + " ,loop of " + i);
				}
				bShouldSub = false;
				// this.notify();
				condition.signal();// 同樣不能使用notify
			} finally {
				lock.unlock();
			}
		}

		public void main(int i) {
			lock.lock();
			try {
				while (bShouldSub) {
					try {
						// this.wait();
						condition.await();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
				for (int j = 0; j < 2; j++) {
					System.out.println("main thread sequence of " + j + " ,loop of " + i);
				}
				bShouldSub = true;
				condition.signal();
				// this.notify();
				// wait和notify必須在synchronized塊中,synchronized加在誰身上,就呼叫誰的wait/notify
			} finally {
				lock.unlock();
			}

		}
	}

}

可以使用Lock和Condition來實現一個緩衝佇列(要區別緩衝和快取的區別), 其實condition doc有這樣的例子:

package javaplay.thread.test;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/*
 * A Condition instance is intrinsically bound to a lock. 
 * To obtain a Condition instance for a particular Lock instance use 
 * its newCondition() method.

 * As an example, suppose we have a bounded buffer which supports put and take methods. 
 * If a take is attempted on an empty buffer, then the thread will block until 
 * an item becomes available; if a put is attempted on a full buffer, 
 * then the thread will block until a space becomes available. 
 * We would like to keep waiting put threads and take threads in separate wait-sets 
 * so that we can use the optimization of only notifying a single thread at a time 
 * when items or spaces become available in the buffer. This can be achieved using 
 * two Condition instances.
 * 可阻塞佇列如下:
 */
class BoundedBuffer {
	final Lock lock = new ReentrantLock();
	// 此處不能用full/empty 這兩種狀態不能代表隊列所有狀態
	// 但notFull/notEmpty卻可以代表取放過程中佇列所有的狀態
	final Condition notFull = lock.newCondition();
	final Condition notEmpty = lock.newCondition();
	// 此處不能使用一個condition完成 比如說有五個執行緒把佇列放滿了
	// 此時有一個取執行緒取走了一個 喚醒了其中一個放執行緒 則有一個放執行緒被執行
	// 執行完又要喚醒一個放執行緒 這個執行緒在執行時佇列已經滿了 不能再放了
	// 有兩個condition就能區分是喚醒取執行緒還是放執行緒

	final Object[] items = new Object[100];
	int putptr, takeptr, count;

	public void put(Object x) throws InterruptedException {
		lock.lock();
		try {
			while (count == items.length)
				notFull.await();// 滿了就要把此執行緒放到"不滿"佇列(虛擬)去等 意思是它在等待陣列不滿的時候
			items[putptr] = x;
			if (++putptr == items.length)
				putptr = 0;
			++count;
			notEmpty.signal();// 只喚醒在"不空"佇列(虛擬)阻塞的取執行緒 喚醒的肯定是在等待陣列不空的執行緒
		} finally {
			lock.unlock();
		}
	}

	public Object take() throws InterruptedException {
		lock.lock();
		try {
			while (count == 0)
				notEmpty.await();
			Object x = items[takeptr];
			if (++takeptr == items.length)
				takeptr = 0;
			--count;
			notFull.signal();// 只喚醒放執行緒 能放肯定不滿
			return x;
		} finally {
			lock.unlock();
		}
	}
}
// (The ArrayBlockingQueue class provides this functionality,
// so there is no reason to implement this sample usage class.)
/** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

據此,我們可以改變“子執行緒迴圈10次,接著主執行緒迴圈100,接著又回到子執行緒迴圈10次,接著再回到主執行緒又迴圈100,如此迴圈50次,請寫出程式”的例子,這個例子是兩個執行緒之間的跳轉,那麼如果實現三個執行緒之間的輪循,比如:執行緒1迴圈10,執行緒2迴圈100,執行緒3迴圈20次,然後又是執行緒1,接著執行緒2...一直輪循50次.

package javaplay.thread.test;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreeConditionCommunication {
	public static void main(String[] args) {
		final Business business = new Business();
		new Thread(new Runnable() {

			@Override
			public void run() {
				for (int i = 0; i < 5; i++) {
					business.sub2(i);
				}

			}

		}).start();
		new Thread(new Runnable() {

			@Override
			public void run() {
				for (int i = 0; i < 5; i++) {
					business.sub3(i);
				}

			}

		}).start();

		for (int i = 0; i < 5; i++) {
			business.main(i);
		}

	}

	static class Business {
		Lock lock = new ReentrantLock();
		Condition condition1 = lock.newCondition();
		Condition condition2 = lock.newCondition();
		Condition condition3 = lock.newCondition();
		private int bShouldSub = 1;

		public void sub2(int i) {
			lock.lock();
			try {
				while (bShouldSub != 2) {
					try {
						condition2.await();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
				for (int j = 0; j < 1; j++) {
					System.out.println("sub2 thread sequence of " + j + " ,loop of " + i);
				}
				bShouldSub = 3;
				condition3.signal();
			} finally {
				lock.unlock();
			}
		}

		public void sub3(int i) {
			lock.lock();
			try {
				while (bShouldSub != 3) {
					try {
						condition3.await();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
				for (int j = 0; j < 1; j++) {
					System.out.println("sub3 thread sequence of " + j + " ,loop of " + i);
				}
				bShouldSub = 1;
				condition1.signal();
			} finally {
				lock.unlock();
			}
		}

		public void main(int i) {
			lock.lock();
			try {
				while (bShouldSub != 1) {
					try {
						condition1.await();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
				for (int j = 0; j < 1; j++) {
					System.out.println("main thread sequence of " + j + " ,loop of " + i);
				}
				bShouldSub = 2;
				condition2.signal();
			} finally {
				lock.unlock();
			}

		}
	}

}

據此可假想的認為,執行緒在阻塞時類似人去銀行取錢排隊等候等待被叫號(喚醒),排的隊也不只有一個,業務不一樣排的隊名也不一樣;

多執行緒爭奪的鎖可以是任何物件,執行緒阻塞也要有在哪個物件上阻塞,this.notify只是喚醒在等待this物件這把鎖的執行緒佇列中的其中一個執行緒,很難理解麼?多寫幾遍!

當一個執行緒執行到wait()方法時,它就進入到一個和該物件相關的等待池中,同時失去(釋放)了物件的機鎖(暫時失去機鎖,wait(long timeout)超時時間到後還需要返還物件鎖);如果能深刻理解,所謂的等待與喚醒都是要指定隊名的(或者執行緒池名),即在哪個物件相關的等待佇列裡面等待和在哪個物件相關的等待佇列裡喚醒執行緒,不能站錯隊;