7.併發程式設計--多執行緒通訊-wait-notify
併發程式設計--多執行緒通訊-wait-notify
多執行緒通訊:執行緒通訊的目的是為了能夠讓執行緒之間相互發送訊號;
1. 多執行緒通訊:
執行緒通訊的目的是為了能夠讓執行緒之間相互發送訊號。另外,執行緒通訊還能夠使得執行緒等待其它執行緒的訊號,比如,執行緒B可以等待執行緒A的訊號,這個訊號可以是執行緒A已經處理完成的訊號;
Object提供了三個方法wait(), notify(), notifyAll()線上程之間進行通訊,以此來解決執行緒間執行順序等問題。
- * wait():釋放當前執行緒的同步監視控制器,並讓當前執行緒進入阻塞狀態,直到別的執行緒發出notify將該執行緒喚醒。
- * notify():喚醒在等待控制監視器的其中一個執行緒(隨機)。只有當前執行緒釋放了同步監視器鎖(呼叫wait)之後,被喚醒的執行緒才有機會執行。
- * notifyAll():與上面notify的區別是同時喚醒多個等待執行緒。
值得注意的是這三個方法是屬於Object而不是屬於Thread的,但是呼叫的時候必須用同步監視器來呼叫,wait(), notify(), notifyAll() 必須和synchronized關鍵字聯合使用
模擬執行緒通訊:自定義實現的通訊模式
示例:ListAdd1.java
1 public class ListAdd1 { 2private volatile static List list = new ArrayList(); 3 4 public void add(){ 5 list.add("bjsxt"); 6 } 7 public int size(){ 8 return list.size(); 9 } 10 11 public static void main(String[] args) { 12 13 final ListAdd1 list1 = new ListAdd1();14 15 Thread t1 = new Thread(new Runnable() { 16 @Override 17 public void run() { 18 try { 19 for(int i = 0; i <10; i++){ 20 list1.add(); 21 System.out.println("當前執行緒:" + Thread.currentThread().getName() + "添加了一個元素.."); 22 Thread.sleep(500); 23 } 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 } 28 }, "t1"); 29 30 Thread t2 = new Thread(new Runnable() { 31 @Override 32 public void run() { 33 while(true){ 34 if(list1.size() == 5){ 35 System.out.println("當前執行緒收到通知:" + Thread.currentThread().getName() + " list size = 5 執行緒停止.."); 36 throw new RuntimeException(); 37 } 38 } 39 } 40 }, "t2"); 41 42 t1.start(); 43 t2.start(); 44 } 45 }
2. 使用JDK的 Object提供了三個方法wait(), notify(), notifyAll()線上程之間進行通訊
示例:
1 import java.util.ArrayList; 2 import java.util.List; 3 import java.util.Queue; 4 import java.util.concurrent.CountDownLatch; 5 import java.util.concurrent.LinkedBlockingDeque; 6 import java.util.concurrent.LinkedBlockingQueue; 7 /** 8 * wait notfiy 方法,wait釋放鎖,notfiy不釋放鎖 9 * @@author Maozw 10 * 11 */ 12 public class ListAdd2 { 13 private volatile static List list = new ArrayList(); 14 15 public void add(){ 16 list.add("bjsxt"); 17 } 18 public int size(){ 19 return list.size(); 20 } 21 22 public static void main(String[] args) { 23 24 final ListAdd2 list2 = new ListAdd2(); 25 26 // 1 例項化出來一個 lock 27 // 當使用wait 和 notify 的時候 , 一定要配合著synchronized關鍵字去使用 28 //final Object lock = new Object(); 29 30 final CountDownLatch countDownLatch = new CountDownLatch(1); 31 32 Thread t1 = new Thread(new Runnable() { 33 @Override 34 public void run() { 35 try { 36 //synchronized (lock) { 37 for(int i = 0; i <10; i++){ 38 list2.add(); 39 System.out.println("當前執行緒:" + Thread.currentThread().getName() + "添加了一個元素.."); 40 Thread.sleep(500); 41 if(list2.size() == 5){ 42 System.out.println("已經發出通知.."); 43 countDownLatch.countDown(); 44 //lock.notify(); 45 } 46 } 47 //} 48 } catch (InterruptedException e) { 49 e.printStackTrace(); 50 } 51 52 } 53 }, "t1"); 54 55 Thread t2 = new Thread(new Runnable() { 56 @Override 57 public void run() { 58 //synchronized (lock) { 59 if(list2.size() != 5){ 60 try { 61 //System.out.println("t2進入..."); 62 //lock.wait(); 63 countDownLatch.await(); 64 } catch (InterruptedException e) { 65 e.printStackTrace(); 66 } 67 } 68 System.out.println("當前執行緒:" + Thread.currentThread().getName() + "收到通知執行緒停止.."); 69 throw new RuntimeException(); 70 //} 71 } 72 }, "t2"); 73 74 t2.start(); 75 t1.start(); 76 77 } 78 }
問題1:wait()方法外面為什麼是while迴圈而不是if判斷?
問題2:notify()是喚醒一個執行緒,notifyAll()是喚醒全部執行緒,但是喚醒然後呢,不管是notify()還是notifyAll(),最終拿到鎖的只會有一個執行緒,那它們到底有什麼區別呢?
OK! 要回答上述兩個問題?我們首先需要明白java物件鎖的模型:
JVM 會為每一個使用內部鎖(synchronized)的物件維護兩個集合,Entry Set和Wait Set,也有人翻譯為鎖池和等待池,意思基本一致。
**Entry Set**
- 如果執行緒A已經持有了物件鎖,此時如果有其他執行緒也想獲得該物件鎖的話,它只能進入Entry Set,並且處於執行緒的BLOCKED狀態。
**Wait Set**
- 如果執行緒A呼叫了wait()方法,那麼執行緒A會釋放該物件的鎖,進入到Wait Set,並且處於執行緒的WAITING狀態。
sequenceDiagram
Entry Set(鎖池)->>Wait Set(等待池): wait()
Wait Set(等待池)->>Entry Set(鎖池): noitify()
注意:某個執行緒B想要獲得物件鎖,一般情況下有兩個先決條件,
- 一是物件鎖已經被釋放了(如曾經持有鎖的前任執行緒A執行完了synchronized程式碼塊或者呼叫了wait()方法等等)
- 二是執行緒B已處於RUNNABLE狀態。
那麼這兩類集合中的執行緒都是在什麼條件下可以轉變為RUNNABLE呢?
- 對於Entry Set中的執行緒,當物件鎖被釋放的時候,JVM會喚醒處於Entry Set中的某一個執行緒,這個執行緒的狀態就從BLOCKED轉變為RUNNABLE。
- 對於Wait Set中的執行緒,當物件的notify()方法被呼叫時,JVM會喚醒處於Wait Set中的某一個執行緒,這個執行緒的狀態就從WAITING轉變為RUNNABLE;或者當notifyAll()方法被呼叫時,Wait Set中的全部執行緒會轉變為RUNNABLE狀態。所有Wait Set中被喚醒的執行緒會被轉移到Entry Set中,然後 每當物件的鎖被釋放後,那些所有處於RUNNABLE狀態的執行緒會共同去競爭獲取物件的鎖.
解答
第一個問題 :wait()方法外面為什麼是while迴圈而不是if判斷?
- 因為wait()的執行緒永遠不能確定其他執行緒會在什麼狀態下notify(),所以必須在被喚醒、搶佔到鎖並且從wait()方法退出的時候再次進行指定條件的判斷,以決定是滿足條件往下執行呢還是不滿足條件再次wait()呢。
第二個問題:既然notify()和notifyAll()最終的結果都是隻有一個執行緒能拿到鎖,那喚醒一個和喚醒多個有什麼區別呢?
- 通過下面這個例子可以非常好的說明;是這樣一個場景:兩個生產者兩個消費者的場景,我們都使用notify()而非notifyAll(),假設消費者執行緒1拿到了鎖,判斷buffer為空,那麼wait(),釋放鎖;然後消費者2拿到了鎖,同樣buffer為空,wait(),也就是說此時Wait Set中有兩個執行緒;然後生產者1拿到鎖,生產,buffer滿,notify()了, 那麼可能消費者1被喚醒了,但是此時還有另一個執行緒生產者2在Entry Set中盼望著鎖,並且最終搶佔到了鎖, 但因為此時buffer是滿的,因此它要wait();然後消費者1拿到了鎖,消費,notify();這時就有問題了,此時生產者2和消費者2都在Wait Set中,buffer為空,如果喚醒生產者2,沒毛病;但如果喚醒了消費者2,因為buffer為空,它會再次wait(),這就尷尬了,萬一生產者1已經退出不再生產了,沒有其他執行緒在競爭鎖了,只有生產者2和消費者2在Wait Set中互相等待,那傳說中的死鎖就發生了。
- notify()換成notifyAll(),這樣的情況就不會再出現了,因為每次notifyAll()都會使其他等待的執行緒從Wait Set進入Entry Set,從而有機會獲得鎖。
1 import java.util.ArrayList; 2 import java.util.List; 3 4 public class Something { 5 private Buffer mBuf = new Buffer(); 6 7 public void produce() { 8 synchronized (this) { 9 while (mBuf.isFull()) { 10 try { 11 wait(); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 } 16 mBuf.add(); 17 notifyAll(); 18 } 19 } 20 21 public void consume() { 22 synchronized (this) { 23 while (mBuf.isEmpty()) { 24 try { 25 wait(); 26 } catch (InterruptedException e) { 27 e.printStackTrace(); 28 } 29 } 30 mBuf.remove(); 31 notifyAll(); 32 } 33 } 34 35 private class Buffer { 36 private static final int MAX_CAPACITY = 1; 37 private List innerList = new ArrayList<>(MAX_CAPACITY); 38 39 void add() { 40 if (isFull()) { 41 throw new IndexOutOfBoundsException(); 42 } else { 43 innerList.add(new Object()); 44 } 45 System.out.println(Thread.currentThread().toString() + " add"); 46 47 } 48 49 void remove() { 50 if (isEmpty()) { 51 throw new IndexOutOfBoundsException(); 52 } else { 53 innerList.remove(MAX_CAPACITY - 1); 54 } 55 System.out.println(Thread.currentThread().toString() + " remove"); 56 } 57 58 boolean isEmpty() { 59 return innerList.isEmpty(); 60 } 61 62 boolean isFull() { 63 return innerList.size() == MAX_CAPACITY; 64 } 65 } 66 67 public static void main(String[] args) { 68 Something sth = new Something(); 69 Runnable runProduce = new Runnable() { 70 int count = 4; 71 72 @Override 73 public void run() { 74 while (count-- > 0) { 75 sth.produce(); 76 } 77 } 78 }; 79 Runnable runConsume = new Runnable() { 80 int count = 4; 81 82 @Override 83 public void run() { 84 while (count-- > 0) { 85 sth.consume(); 86 } 87 } 88 }; 89 for (int i = 0; i < 2; i++) { 90 new Thread(runConsume).start(); 91 } 92 for (int i = 0; i < 2; i++) { 93 new Thread(runProduce).start(); 94 } 95 } 96 }
join
首先,join()是Thread類的一個方法,而不是object的方法;
JDK中是這樣描述的:
//join()方法的作用,是等待這個執行緒結束 public final void join()throws InterruptedException: Waits for this thread to die. 在Java 7 Concurrency Cookbook"的定義為: join() method suspends the execution of the calling thread until the object called finishes its execution. 也就是說,t.join()方法阻塞呼叫此方法的執行緒(calling thread),直到執行緒t完成,此執行緒再繼續; 舉個例子:通常用於在main()主執行緒內,等待其它執行緒完成再結束main()主執行緒。例如:
1 package com.maozw.springmvc.controller; 2 3 import java.util.Date; 4 import java.util.concurrent.TimeUnit; 5 6 public class JoinTest implements Runnable { 7 8 private String name; 9 10 public JoinTest(String name) { 11 this.name = name; 12 } 13 14 public void run() { 15 System.out.printf("%s begins: %s\n", name, new Date()); 16 try { 17 TimeUnit.SECONDS.sleep(4); 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } 21 System.out.printf("%s has finished: %s\n", name, new Date()); 22 } 23 24 public static void main(String[] args) { 25 Thread thread1 = new Thread(new JoinTest("One")); 26 Thread thread2 = new Thread(new JoinTest("Two")); 27 thread1.start(); 28 thread2.start(); 29 30 try { 31 thread1.join(); 32 thread2.join(); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } 36 System.out.println("Main thread is finished"); 37 } 38 }
輸出結果
1 One begins: Mon Jul 23 22:41:21 CST 2018 2 Two begins: Mon Jul 23 22:41:21 CST 2018 3 Two has finished: Mon Jul 23 22:41:25 CST 2018 4 One has finished: Mon Jul 23 22:41:25 CST 2018 5 Main thread is finished
解說join原理:
我們嘗試去開啟的起原始碼:
- 通過原始碼可以看出,Join方法實現是通過wait。 當main執行緒呼叫t.join時候,main執行緒會獲得執行緒物件t的鎖(wait 意味著拿到該物件的鎖),呼叫該物件的wait(等待時間),直到該物件喚醒main執行緒 ,比如退出後。這就意味著main 執行緒呼叫t.join時,必須能夠拿到執行緒t物件的鎖。
/** * Waits for this thread to die. * * <p> An invocation of this method behaves in exactly the same * way as the invocation * * <blockquote> * {@linkplain #join(long) join}{@code (0)} * </blockquote> * * @throws InterruptedException * if any thread has interrupted the current thread. The * <i>interrupted status</i> of the current thread is * cleared when this exception is thrown. */ public final void join() throws InterruptedException { join(0); } /** * Waits at most {@code millis} milliseconds for this thread to * die. A timeout of {@code 0} means to wait forever. * * <p> This implementation uses a loop of {@code this.wait} calls * conditioned on {@code this.isAlive}. As a thread terminates the * {@code this.notifyAll} method is invoked. It is recommended that * applications not use {@code wait}, {@code notify}, or * {@code notifyAll} on {@code Thread} instances. * * @param millis * the time to wait in milliseconds * * @throws IllegalArgumentException * if the value of {@code millis} is negative * * @throws InterruptedException * if any thread has interrupted the current thread. The * <i>interrupted status</i> of the current thread is * cleared when this exception is thrown. */ public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }