Java執行緒(九):Condition-執行緒通訊更高效的方式
接近一週沒更新《Java執行緒》專欄了,主要是這周工作上比較忙,生活上也比較忙,呵呵,進入正題,上一篇講述了併發包下的Lock,Lock可以更好的解決執行緒同步問題,使之更面向物件,並且ReadWriteLock在處理同步時更強大,那麼同樣,執行緒間僅僅互斥是不夠的,還需要通訊,本篇的內容是基於上篇之上,使用Lock如何處理執行緒通訊。
那麼引入本篇的主角,Condition,Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的物件,以便通過將這些物件與任意 Lock 實現組合使用,為每個物件提供多個等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。下面將之前寫過的一個執行緒通訊的例子替換成用Condition實現(Java執行緒(三)),程式碼如下:
在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統執行緒的通訊方式,Condition都可以實現,這裡注意,Condition是被繫結到Lock上的,要建立一個Lock的Condition必須用newCondition()方法。public class ThreadTest2 { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable() { @Override public void run() { threadExecute(business, "sub"); } }).start(); threadExecute(business, "main"); } public static void threadExecute(Business business, String threadType) { for(int i = 0; i < 100; i++) { try { if("main".equals(threadType)) { business.main(i); } else { business.sub(i); } } catch (InterruptedException e) { e.printStackTrace(); } } } } class Business { private boolean bool = true; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public /*synchronized*/ void main(int loop) throws InterruptedException { lock.lock(); try { while(bool) { condition.await();//this.wait(); } for(int i = 0; i < 100; i++) { System.out.println("main thread seq of " + i + ", loop of " + loop); } bool = true; condition.signal();//this.notify(); } finally { lock.unlock(); } } public /*synchronized*/ void sub(int loop) throws InterruptedException { lock.lock(); try { while(!bool) { condition.await();//this.wait(); } for(int i = 0; i < 10; i++) { System.out.println("sub thread seq of " + i + ", loop of " + loop); } bool = false; condition.signal();//this.notify(); } finally { lock.unlock(); } } }
這樣看來,Condition和傳統的執行緒通訊沒什麼區別,Condition的強大之處在於它可以為多個執行緒間建立不同的Condition,下面引入API中的一段程式碼,加以說明。
這是一個處於多執行緒工作環境下的快取區,快取區提供了兩個方法,put和take,put是存資料,take是取資料,內部有個快取佇列,具體變數和方法說明見程式碼,這個快取區類實現的功能:有多個執行緒往裡面存資料和從裡面取資料,其快取佇列(先進先出後進後出)能快取的最大數值是100,多個執行緒間是互斥的,當快取佇列中儲存的值達到100時,將寫執行緒阻塞,並喚醒讀執行緒,當快取佇列中儲存的值為0時,將讀執行緒阻塞,並喚醒寫執行緒,這也是ArrayBlockingQueue的內部實現。下面分析一下程式碼的執行過程:class BoundedBuffer { final Lock lock = new ReentrantLock();//鎖物件 final Condition notFull = lock.newCondition();//寫執行緒條件 final Condition notEmpty = lock.newCondition();//讀執行緒條件 final Object[] items = new Object[100];//快取佇列 int putptr/*寫索引*/, takeptr/*讀索引*/, count/*佇列中存在的資料個數*/; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length)//如果佇列滿了 notFull.await();//阻塞寫執行緒 items[putptr] = x;//賦值 if (++putptr == items.length) putptr = 0;//如果寫索引寫到佇列的最後一個位置了,那麼置為0 ++count;//個數++ notEmpty.signal();//喚醒讀執行緒 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0)//如果佇列為空 notEmpty.await();//阻塞讀執行緒 Object x = items[takeptr];//取值 if (++takeptr == items.length) takeptr = 0;//如果讀索引讀到佇列的最後一個位置了,那麼置為0 --count;//個數-- notFull.signal();//喚醒寫執行緒 return x; } finally { lock.unlock(); } } }
1. 一個寫執行緒執行,呼叫put方法;
2. 判斷count是否為100,顯然沒有100;
3. 繼續執行,存入值;
4. 判斷當前寫入的索引位置++後,是否和100相等,相等將寫入索引值變為0,並將count+1;
5. 僅喚醒讀執行緒阻塞佇列中的一個;
6. 一個讀執行緒執行,呼叫take方法;
7. ……
8. 僅喚醒寫執行緒阻塞佇列中的一個。
這就是多個Condition的強大之處,假設快取佇列中已經存滿,那麼阻塞的肯定是寫執行緒,喚醒的肯定是讀執行緒,相反,阻塞的肯定是讀執行緒,喚醒的肯定是寫執行緒,那麼假設只有一個Condition會有什麼效果呢,快取佇列中已經存滿,這個Lock不知道喚醒的是讀執行緒還是寫執行緒了,如果喚醒的是讀執行緒,皆大歡喜,如果喚醒的是寫執行緒,那麼執行緒剛被喚醒,又被阻塞了,這時又去喚醒,這樣就浪費了很多時間。