深入理解Java 執行緒通訊
當執行緒在系統內執行時,執行緒的排程具有一定的透明性,程式通常無法準確控制執行緒的輪換執行,但 Java 也提供了一些機制來保證執行緒協調執行。
傳統的執行緒通訊
假設現在系統中有兩個執行緒,這兩個執行緒分別代表存款者和取錢者——現在假設系統有一種特殊的要求,系統要求存款者和取錢者不斷地重複存款、取錢的動作,而且要求每當存款者將錢存入指定賬戶後,取錢者就立即取出該筆錢。不允許存款者連續兩次存錢,也不允許取錢者連續兩次取錢。
為了實現這種功能,可以藉助於 Object 類提供的 wait()、 notify() 和 notifyAll() 三個方法,這三個方法並不屬於 Thread 類,而是屬於 Object 類。但這三個方法必須由同步監視器物件來呼叫,這可分成以下兩種情況。
- 對於使用 synchronized 修飾的同步方法,因為該類的預設例項(this)就是同步監視器,所以可以在同步方法中直接呼叫這三個方法。
- 對於使用 synchronized 修飾的同步程式碼塊,同步監視器是 synchronized 後括號裡的物件,所以必須使用該物件呼叫這三個方法。
關於這三個方法的解釋如下。
- wait():導致當前執行緒等待,直到其他執行緒呼叫該同步監視器的 notify() 方法或 notifyAll() 方法來喚醒該執行緒。該 wait() 方法有三種形式——無時間引數的 wait (—直等待,直到其他執行緒通知 )、帶毫秒引數的 wait() 和帶毫秒、毫微秒引數的 wait() (這兩種方法都是等待指定時間後自動甦醒)。呼叫 wait() 方法的當前執行緒會釋放對該同步監視器的鎖定。
- notify():喚醒在此同步監視器上等待的單個執行緒。如果所有執行緒都在此同步監視器上等待,則會選擇喚醒其中一個執行緒。選擇是任意性的。只有當前執行緒放棄對該同步監視器的鎖定後(使用 wait() 方法),才可以執行被喚醒的執行緒。
- notifyAll():喚醒在此同步監視器上等待的所有執行緒。只有當前執行緒放棄對該同步監視器的鎖定後,才可以執行被喚醒的執行緒。
程式中可以通過一個旗標來標識賬戶中是否已有存款,當旗標為 false 時,表明賬戶中沒有存款,存款者執行緒可以向下執行,當存款者把錢存入賬戶後,將旗標設為 true ,並呼叫 notify() 或 notifyAll() 方法來喚醒其他執行緒;當存款者執行緒進入執行緒體後,如果旗標為 true 就呼叫 wait() 方法讓該執行緒等待。
當旗標為 true 時,表明賬戶中已經存入了存款,則取錢者執行緒可以向下執行,當取錢者把錢從賬戶中取出後,將旗標設為 false ,並呼叫 notify() 或 notifyAll() 方法來喚醒其他執行緒;當取錢者執行緒進入執行緒體後,如果旗標為 false 就呼叫 wait() 方法讓該執行緒等待。
本程式為 Account 類提供 draw() 和 deposit() 兩個方法,分別對應該賬戶的取錢、存款等操作,因為這兩個方法可能需要併發修改 Account 類的 balance 成員變數的值,所以這兩個方法都使用 synchronized 修飾成同步方法。除此之外,這兩個方法還使用了 wait() 和 notifyAll() 來控制執行緒的協作。
public class Account{ private String accountNo; private double balance; //標識賬戶中是否已有存款的旗標 private boolean flag = false; public Account(){} public Account(String accountNo,double balance){ this.accountNo = accountNo; this.balance = balance; } public void setAccountNo(String accountNo){ this.accountNo = accountNo; } public String getAccountNo(){ return this.accountNo; } public double getBalance(){ return this.balance; } public synchronized void draw(double drawAmount){ try{ //如果flag為假,表明賬戶中還沒有人存錢進去,則取錢方法阻塞 if (!flag){ wait(); }else{ //執行取錢 System.out.println(Thread.currentThread().getName() + " 取錢:" + drawAmount); balance -= drawAmount; System.out.println("賬戶餘額為:" + balance); //將標識賬戶是否已有存款的旗標設為false。 flag = false; //喚醒其他執行緒 notifyAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } } public synchronized void deposit(double depositAmount){ try{ //如果flag為真,表明賬戶中已有人存錢進去,則存錢方法阻塞 if (flag){ // ① wait(); }else{ //執行存款 System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount); balance += depositAmount; System.out.println("賬戶餘額為:" + balance); //將表示賬戶是否已有存款的旗標設為true flag = true; //喚醒其他執行緒 notifyAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } } public int hashCode(){ return accountNo.hashCode(); } public boolean equals(Object obj){ if (obj != null && obj.getClass() == Account.class){ Account target = (Account)obj; return target.getAccountNo().equals(accountNo); } return false; } }
上面程式中的粗體字程式碼使用 wait() 和 notifyAll() 進行了控制,對存款者執行緒而言,當程式進入 deposit() 方法後,如果 flag 為 true ,則表明賬戶中已有存款,程式呼叫 wait() 方法阻塞;否則程式向下執行存款操作,當存款操作執行完成後,系統將 flag 設為 true,然後呼叫 notifyAll() 來喚醒其他被阻塞的執行緒——如果系統中有存款者執行緒,存款者執行緒也會被喚醒,但該存款者執行緒執行到①號程式碼處時再次進入阻塞狀態,只有執行 draw() 方法的取錢者執行緒才可以向下執行。同理,取錢者執行緒的執行流程也是如此。
程式中的存款者執行緒迴圈100次重複存款,而取錢者執行緒則迴圈100次重複取錢,存款者執行緒和取錢者執行緒分別呼叫 Account 物件的 deposit()、 draw() 方法來實現。
public class DrawThread extends Thread{ //模擬使用者賬戶 private Account account; //當前取錢執行緒所希望取的錢數 private double drawAmount; public DrawThread(String name,Account account,double drawAmount){ super(name); this.account = account; this.drawAmount = drawAmount; } //重複100次執行取錢操作 public void run(){ for (int i = 0 ; i < 100 ; i++ ){ account.draw(drawAmount); } } }
public class DepositThread extends Thread{ //模擬使用者賬戶 private Account account; //當前取錢執行緒所希望存款的錢數 private double depositAmount; public DepositThread(String name,double depositAmount){ super(name); this.account = account; this.depositAmount = depositAmount; } //重複100次執行存款操作 public void run(){ for (int i = 0 ; i < 100 ; i++ ){ account.deposit(depositAmount); } } }
主程式可以啟動任意多個存款執行緒和取錢執行緒,可以看到所有的取錢執行緒必須等存款執行緒存錢後才可以向下執行,而存款執行緒也必須等取錢執行緒取錢後才可以向下執行。主程式程式碼如下。
public class TestDraw{ public static void main(String[] args){ //建立一個賬戶 Account acct = new Account("1234567",0); new DrawThread("取錢者",acct,800).start(); new DepositThread("存款者甲",800).start(); new DepositThread("存款者乙",800).start(); new DepositThread("存款者丙",800).start(); } }
執行該程式,可以看到存款者執行緒、取錢者執行緒交替執行的情形,每當存款者向賬戶中存入800元之後,取錢者執行緒立即從賬戶中取出這筆錢。存款完成後賬戶餘額總是800元,取錢結束後賬戶餘額總是0元。執行該程式,會看到如下圖所示的結果。
從上圖中可以看出 , 3個存款者執行緒隨機地向賬戶中存款,只有1個取錢者執行緒執行取錢操作。只有當取錢者取錢後,存款者才可以存款;同理,只有等存款者存款後,取錢者執行緒才可以取錢。
上圖顯示程式最後被阻塞無法繼續向下執行,這因為3個存款者執行緒共有300次存款操作,但1個取錢者執行緒只有100次取錢操作,所以程式最後被阻塞。
注意:上圖所示的阻塞並不是死鎖,對於這種情況,取錢者執行緒已經執行結束,而存款者執行緒只是在等待其他執行緒來取錢而已,並不是等待其他執行緒釋放同步監視器。不要把死鎖和程式阻塞等同起來!
使用 Condition 控制執行緒通訊
如果程式不使用 synchronized 關鍵字來保證同步,而是直接使用 Lock 物件來保證同步,則系統中不存在隱式的同步監視器,也就不能使用 wait()、notify()、notifyAll() 方法進行執行緒通訊了。
當使用 Lock 物件來保證同步時,Java 提供了一個 Condition 類來保持協調,使用 Condition 可以讓那些已經得到 Lock 物件卻無法繼續執行的執行緒釋放 Lock 物件,Condition 物件也可以喚醒其他處於等待的執行緒。
Condition 將同步監視器方法(wait()、notify() 和 notifyAll() )分解成截然不同的物件,以便通過將這些物件與 Lock 物件組合使用,為每個物件提供多個等待集(wait-set)。在這種情況下,Lock 替代了同步方法或同步程式碼塊,Condition 替代了同步監視器的功能。
Condition 例項被繫結在一個 Lock 物件上。要獲得特定 Lock 例項的 Condition 例項,呼叫 Lock 物件的 newCondition() 方法即可。Condition 類提供瞭如下三個方法。
- await():類似於隱式同步監視器上的 wait() 方法,導致當前執行緒等待,直到其他執行緒呼叫該 Condition 的 signal() 方法或 signalAll() 方法來喚醒該執行緒。該 await() 方法有更多變體,如 long awaitNanos(long nanosTimeout)、 void awaitUninterruptibly() 、 awaitUntil(Date deadline) 等,可以完成更豐富的等待操作。
- signal():喚醒在此 Lock 物件上等待的單個執行緒。如果所有執行緒都在該 Lock 物件上等待,則會選擇喚醒其中一個執行緒。選擇是任意性的。只有當前執行緒放棄對該 Lock 物件的鎖定後(使用 await() 方法),才可以執行被喚醒的執行緒。
- signalAll():喚醒在此 Lock 物件上等待的所有執行緒。只有當前執行緒放棄對該 Lock 物件的鎖定後,才可以執行被喚醒的執行緒。
下面程式中 Account 使用 Lock 物件來控制同步,並使用 Condition 物件來控制執行緒的協調執行。
public class Account{ //顯示定義Lock物件 private final Lock lock = new ReentrantLock(); //獲得指定Lock物件對應的條件變數 private final Condition cond = lock.newCondition(); private String accountNo; private double balance; //標識賬戶中是否已經存款的旗標 private boolean flag = false; public Account(){} public Account(String accountNo,double balance){ this.accountNo = accountNo; this.balance = balance; } public void setAccountNo(String accountNo){ this.accountNo = accountNo; } public String getAccountNo(){ return this.accountNo; } public double getBalance(){ return this.balance; } public void draw(double drawAmount){ //加鎖 lock.lock(); try{ //如果賬戶中還沒有存入存款,該執行緒等待 if (!flag){ cond.await(); }else{ //執行取錢操作 System.out.println(Thread.currentThread().getName() + " 取錢:" + drawAmount); balance -= drawAmount; System.out.println("賬戶餘額為:" + balance); //將標識是否成功存入存款的旗標設為false flag = false; //喚醒該Lock物件對應的其他執行緒 cond.signalAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } //使用finally塊來確保釋放鎖 finally{ lock.unlock(); } } public void deposit(double depositAmount){ lock.lock(); try{ //如果賬戶中已經存入了存款,該執行緒等待 if(flag){ cond.await(); }else{ //執行存款操作 System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount); balance += depositAmount; System.out.println("賬戶餘額為:" + balance); //將標識是否成功存入存款的旗標設為true flag = true; //喚醒該Lock物件對應的其他執行緒 cond.signalAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } //使用finally塊來確保釋放鎖 finally{ lock.unlock(); } } public int hashCode(){ return accountNo.hashCode(); } public boolean equals(Object obj){ if (obj != null && obj.getClass() == Account.class){ Account target = (Account)obj; return target.getAccountNo().equals(accountNo); } return false; } }
顯式地使用 Lock 物件來充當同步監視器,則需要使用 Condition 物件來暫停、喚醒指定執行緒。存取錢的程式碼和最上面相同。
使用阻塞佇列(BlockingQueue)控制執行緒通訊
Java5 提供了一個 BlockingQueue 介面,雖然 BlockingQueue 也是 Queue 的子介面,但它的主要用途並不是作為容器,而是作為執行緒同步的工具。 BlockingQueue 具有一個特徵:當生產者執行緒試圖向 BlockingQueue 中放入元素時,如果該佇列已滿,則該執行緒被阻塞;當消費者執行緒試圖從 BlockingQueue 中取出元素時,如果該佇列已空,則該執行緒被阻塞。
程式的兩個執行緒通過交替向 BlockingQueue 中放入元素、取出元素,即可很好地控制執行緒的通訊。BlockingQueue 提供如下兩個支援阻塞的方法。
- put(E e):嘗試把 E 元素放入 BlockingQueue 中,如果該佇列的元素己滿,則阻塞該執行緒。
- take():嘗試從 BlockingQueue 的頭部取出元素,如果該佇列的元素已空,則阻塞該執行緒。
BlockingQueue 繼承了 Queue 介面,當然也可使用 Queue 介面中的方法。這些方法歸納起來可分為如下三組。
- 在佇列尾部插入元素。包括 add(E e)、offer(E e) 和 put(E e) 方法,當該佇列已滿時,這三個方法分別會丟擲異常、返回 false 、阻塞佇列。
- 在佇列頭部刪除並返回刪除的元素。包括 remove()、 poll() 和 take() 方法。當該佇列已空時,這三個方法分別會丟擲異常、返回 false 、阻塞佇列。
- 在佇列頭部取出但不刪除元素。包括 element() 和 peek() 方法,當佇列已空時,這兩個方法分別丟擲異常、返回 false
BlockingQueue 包含的方法之間的對應關係如下表所示:
BlockingQueue 與其實現類之間的類圖如下圖所示。
上圖中以黑色方框框出的都是 Java7 新增的阻塞佇列。可以看到 , BlockingQueue 包含如下5個實現類。
- ArrayBlockingQueue:基於陣列實現的 BlockingQueue 佇列。
- LinkedBlockingQueue:基於連結串列實現的 BlockingQueue 佇列。
- PriorityBlockingQueue:它並不是標準的阻塞佇列。與前面介紹的 PriorityQueue 類似,該佇列呼叫 remove()、poll()、take() 等方法取出元素時,並不是取出佇列中存在時間最長的元素,而是佇列中最小的元素。 PriorityBlockingQueue 判斷元素的大小即可根據元素(實現 Comparable 介面)的本身大小來自然排序,也可使用 Comparator 進行定製排序。
- SynchronousQueue:同步佇列。對該佇列的存、取操作必須交替進行。
- DelayQueue:它是一個特殊的 BlockingQueue ,底層基於 PriorityBlockingQueue 實現。不過,DelayQueue 要求集合元素都實現 Delay 介面(該接口裡只有一個 long getDelay() 方法),DelayQueue 根據集合元素的 getDalay() 方法的返回值進行排序。
下面以 ArrayBlockingQueue 為例介紹阻塞佇列的功能和用法。下面先用一個最簡單的程式來測試 BlockingQueue 的 put() 方法。
public class BlockingQueueTest { public static void main(String[] args) throws Exception { BlockingQueue<String> bq = new ArrayBlockingQueue<>(2); bq.put("Java"); // 與bq.add("Java")、bq.offer("Java") 相同 bq.put("Java"); // 與bq.add("Java")、bq.offer("Java") 相同 bq.put("Java"); // ① 阻塞執行緒 } }
上面程式先定義一個大小為2的 BlockingQueue,程式先向該佇列中放入兩個元素,此時佇列還沒有滿,兩個元紊都可以放入,因此使用 put()、add() 和 offer() 方法效果完全一樣。當程式試圖放入第三個元素時,如果使用 put() 方法嘗試放入元素將會阻寒執行緒,如上面程式①號程式碼所示。如果使用 add() 方法嘗試放入元素將會引發異常;如果使用 offer() 方法嘗試放入元素則會返回 false,元素不會被放入。
與此類似的是,在 BlockingQueue 已空的情況下,程式使用 take() 方法嘗試取出元素將會阻塞執行緒:使用 remove() 方法嘗試取出元素將引發異常:使用 poll() 方法嘗試取出元素將返回 false,元索不會被刪除。
掌握了 BlodcingQuene 阻塞佇列的特性之後,下面程式就可以利用 BlockingQueue 來實現執行緒通訊了。
public class Producer extends Thread { private BlockingQueue<String> bq; public Producer(BlockingQueue<String> bq) { this.bq = bq; } public void run() { String[] strArr = new String[] { "Java","Struts","Spring" }; for(int i=0;i<99999999;i++) { System.out.println(getName()+"生產者準備生產集合元素"); try { Thread.sleep(200); // 嘗試放入元素,如果佇列已滿,則執行緒被阻塞 bq.put(strArr[i%3]); }catch(Exception ex) { ex.printStackTrace(); } System.out.println(getName()+"生產完成:"+bq); } } } public class Consumer extends Thread { private BlockingQueue<String> bq; public Consumer(BlockingQueue<String> bq) { this.bq = bq; } public void run() { while(true) { System.out.println(getName()+"消費者準備消費集合元素!"); try { Thread.sleep(200); // 嘗試取出元素,如果佇列已空,則執行緒被阻塞 bq.take(); }catch(Exception ex) { ex.printStackTrace(); } System.out.println(getName()+"消費完成:"+bq); } } } public class BlockingQueueTest2 { public static void main(String[] args) { // 建立一個容量為1的BlockingQueue BlockingQueue<String> bq = new ArrayBlockingQueue<>(1); // 啟動3個生產者執行緒 new Producer(bq).start(); new Producer(bq).start(); new Producer(bq).start(); // 啟動一個消費者執行緒 new Consumer(bq).start(); } }
上面程式啟動了 3個生產者執行緒向 BlockingQueue 集合放入元素,啟動了 1個消費者執行緒從 BlockingQueue 集合取出元素。本程式的 BlockingQueue 集合容量為1,因此3個生產者執行緒無法連續放入元素,必須等待消費者執行緒取出一個元素後 , 3個生產者執行緒的其中之一才能放入一個元素。執行該程式,會看到如下圖所示的結果。
從上圖可以看出,3個生產者執行緒都想向 BlockingQueue 中放入元素,但只要其中一個執行緒向該佇列中放入元素之後,其他生產者執行緒就必須等待,等待消費者執行緒取出 BlockingQueue 佇列裡的元素。
以上就是深入理解Java 執行緒通訊的詳細內容,更多關於Java 執行緒通訊的資料請關注我們其它相關文章!