執行緒之間的通訊方式:wait/notify
1.什麼是執行緒間的通訊
通訊,顧名思義就是一種通知交通的方式,在多執行緒的環境下,如果各個執行緒之間可以互相通訊的話,可以很好地提高工作效率,提高CPU的利用率。
2.執行緒間常用的通訊方式
多執行緒間的通訊一般採取等待/通知機制進行實現,即Object類中的wait()和notify()方法實現的,一個是等待,一個是通知。其實就像我們平時去營業廳辦理業務一樣,我們要先取號,然後就開始等待,等到聽到叫我們號的時候,我們再過去辦理業務。
- Java中等待/通知機制的實現
- 如上面所說的,wait()和notify()這兩個方法都是Object類中的方法,之所以是超類的方法,其實是因為之前我們說過任何物件都可以作為鎖,而這兩個方法都是由鎖呼叫的,所以很自然地就可以理解為什麼這兩個方法是屬於超類的。
- wait方法:
- 作用是使當前執行程式碼的執行緒進行等待,該方法會將該執行緒放入”等待佇列“中,並且在wait()所在的程式碼處停止執行,直到接到通知或被中斷為止。
- 在呼叫 wait() 之前,執行緒必須獲得該物件級別鎖,即只能在同步方法或同步塊中呼叫 wait() 方法。
- wait() 是釋放鎖的,即在執行到 wait() 方法之後,當前執行緒會釋放鎖,當從 wait() 方法返回前,執行緒與其他執行緒競爭重新獲得鎖
- 此外,還有帶一個引數的wait(long),表示在等待一段時間內,如果沒有喚醒執行緒,則會自動喚醒。當然,在這段時間內,也可以由其他執行緒喚醒。
- notify方法:
- 和 wait() 方法一樣, notify() 方法也要在同步塊或同步方法中呼叫,即在呼叫前,執行緒也必須獲得該物件的物件級別鎖。
- 該方法用來通知那些可能等待該物件的物件鎖的其他執行緒,如果有多個執行緒等待,則由執行緒規劃器隨機挑選出其中一個是wait狀態的執行緒,對其發出通知notify,並使它等待獲取該物件的物件鎖。即notify方法一次只隨機喚醒一個wait狀態的執行緒。
- 這裡需要注意的是,執行notify方法之後,當前執行緒不會立即釋放其擁有的該物件鎖,而是執行完之後才會釋放該物件鎖,被通知的執行緒也不會立即獲得物件鎖,而是等待notify方法執行完之後,釋放了該物件鎖,才可以獲得該物件鎖。
- notifyAll() 通知所有等待同一共享資源的全部執行緒從等待狀態退出,進入可執行狀態,重新競爭獲得物件鎖。即notifyAll方法可以喚醒所有wait狀態的執行緒。
- wait()/notify()總結:
用一句話來說就是:wait使執行緒停止執行,notify使停止的執行緒繼續執行 。
每個鎖物件都有兩個佇列,一個是就緒佇列,一個是阻塞佇列。就緒佇列儲存了將要獲得鎖的執行緒,阻塞佇列儲存了被阻塞的執行緒。一個執行緒被喚醒之後,才會進入就緒佇列,等待CPU的排程;反之,一個執行緒呼叫wait方法後,就會進入阻塞佇列,等待下一次被喚醒。
- 要結合synchronized關鍵字一起使用,因為他們都需要首先獲取該物件的物件鎖;
- wait方法是釋放鎖,notify方法是不釋放鎖的;
- 執行緒的四種狀態如下圖:
- wait/notify執行緒間通訊示例程式碼
- Mylist程式碼:
public class MyList { private static List list = new ArrayList(); public static void add() { list.add("我是元素"); } public static int size() { return list.size(); } }
- 執行緒A:
public class ThreadA extends Thread { private Object lock; public ThreadA(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { if (MyList.size() != 5) { System.out.println("wait begin " + System.currentTimeMillis()); lock.wait(); System.out.println("wait end " + System.currentTimeMillis()); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
- 執行緒B:
public class ThreadB extends Thread { private Object lock; public ThreadB(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { for (int i = 0; i < 10; i++) { MyList.add(); if (MyList.size() == 5) { lock.notify(); System.out.println("已發出通知!"); } System.out.println("添加了" + (i + 1) + "個元素!"); Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
- 測試程式碼:
public class Run { public static void main(String[] args) { try { Object lock = new Object(); ThreadA a = new ThreadA(lock); a.start(); Thread.sleep(50); ThreadB b = new ThreadB(lock); b.start(); } catch (InterruptedException e) { e.printStackTrace(); } } }
- 執行結果:
wait begin 1507634541467 添加了1個元素! 添加了2個元素! 添加了3個元素! 添加了4個元素! 已發出通知! 添加了5個元素! 添加了6個元素! 添加了7個元素! 添加了8個元素! 添加了9個元素! 添加了10個元素! wait end 1507634551563
由上面可以看出,雖然執行緒B在第五個元素的時候發出通知,而執行緒A實現執行緒B執行完之後才獲得物件鎖,這也可以明,wait方法是釋放鎖的而notify方法是不釋放鎖的。因為如果notify方法會釋放鎖的話,那麼應該在列印通知之前就執行執行緒A中的列印wait end。
- Mylist程式碼:
-
使用wait/notify模擬BlockingQueue阻塞佇列
-
BlockingQueue是阻塞佇列,我們需要實現的是阻塞的放入和得到資料,設計思路如下:
(1)初始化佇列最大長度為5;
(2)需要新加入的時候,判斷是否長度為5,如果是5則等待插入;
(3)需要消費元素的時候,判斷是否為0,如果是0則等待消費; -
實現程式碼如下:
public class MyQueue { //1、需要一個承裝元素的集合 private final LinkedList<Object> list = new LinkedList<>(); //2、需要一個計數器 private final AtomicInteger count = new AtomicInteger(0); //3、需要指定上限和下限 private final int maxSize = 5; private final int minSize = 0; //5、初始化鎖物件 private final Object lock = new Object(); /** * put方法 */ public void put(Object obj) { synchronized (lock) { //達到最大無法新增,進入等到 while (count.get() == maxSize) { try { lock.wait(); }catch (InterruptedException e) { e.printStackTrace(); } } list.add(obj); //加入元素 count.getAndIncrement(); //計數器增加 System.out.println(" 元素 " + obj + " 被新增 "); lock.notify(); //通知另外一個阻塞的執行緒方法 } } /** * get方法 */ public Object get() { Object temp; synchronized (lock) { //達到最小,沒有元素無法消費,進入等待 while (count.get() == minSize) { try { lock.wait(); }catch (InterruptedException e) { e.printStackTrace(); } } count.getAndDecrement(); temp = list.removeFirst(); System.out.println(" 元素 " + temp + " 被消費 "); lock.notify(); } return temp; } private int size() { return count.get(); } public static void main(String[] args) throws Exception { final MyQueue myQueue = new MyQueue(); initMyQueue(myQueue); Thread t1 = new Thread(() -> { myQueue.put("h"); myQueue.put("i"); }, "t1"); Thread t2 = new Thread(() -> { try { Thread.sleep(2000); myQueue.get(); Thread.sleep(2000); myQueue.get(); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2"); t1.start(); Thread.sleep(1000); t2.start(); } private static void initMyQueue(MyQueue myQueue) { myQueue.put("a"); myQueue.put("b"); myQueue.put("c"); myQueue.put("d"); myQueue.put("e"); System.out.println("當前元素個數:" + myQueue.size()); } }
-
執行結果:
元素 a 被新增 元素 b 被新增 元素 c 被新增 元素 d 被新增 元素 e 被新增 當前元素個數:5 元素 a 被消費 元素 h 被新增 元素 b 被消費 元素 i 被新增
注意:在資料結構中,佇列是可以無長度限制的,就是可以無限擴充套件,但是對於阻塞佇列,他之所以稱之為阻塞佇列就是因為其有長度限制,也是上述例項中的maxSize,這也是常見的筆試面試題中比較容易忽略的一個地方,想當然的認為只要是佇列他就是無長度限制的,看到這裡你應該知道了Java中提供的阻塞佇列的類是有長度限制的!
-
當呼叫wait方法的時候,wait方法所在的程式碼塊停止執行,直到被notify喚醒才開始執行。所以這裡的get和put中都有wait和notify方法,可以理解為相互制約和喚醒。
-
-
-
注意事項:
-
wait()和notify()方法要在同步塊或同步方法中呼叫,即在呼叫前,執行緒也必須獲得該物件的物件級別鎖。
-
wait方法是釋放鎖,notify方法是不釋放鎖的;
-
notify每次喚醒wait等待狀態的執行緒都是隨機的,且每次只喚醒一個;
-
notifAll每次喚醒wait等待狀態的執行緒使之重新競爭獲取物件鎖,優先順序最高的那個執行緒會最先執行;
-
當執行緒處於wait()狀態時,呼叫執行緒物件的interrupt()方法會出現InterruptedException異常;
-
3.其他通訊方式
(1)程序間的通訊方式:
管道(pipe)、有名管道(named pipe)、訊號量(semophore)、訊息佇列(message
queue)、訊號(signal)、共享記憶體(shared memory)、套接字(socket);
(2)執行緒程間的通訊方式:
1、鎖機制:
1.1 互斥鎖:提供了以排它方式阻止資料結構被併發修改的方法。
1.2 讀寫鎖:允許多個執行緒同時讀共享資料,而對寫操作互斥。
1.3 條件變數:可以以原子的方式阻塞程序,直到某個特定條件為真為止。
對條件測試是在互斥鎖的保護下進行的。條件變數始終與互斥鎖一起使用。
2、訊號量機制:包括無名執行緒訊號量與有名執行緒訊號量
3、訊號機制:類似於程序間的訊號處理。
執行緒間通訊的主要目的是用於執行緒同步,所以執行緒沒有像程序通訊中用於資料交換的
通訊機制。
徐劉根大佬的多執行緒專欄:https://blog.csdn.net/column/details/17790.html