java併發——wait()、notify()、notifyAll()、join()
鎖池和等待池
wait() ,notifyAll(),notify() 三個方法都是Object類中的方法。學習他們之前先要了解倆個概念。java中,每個物件都有兩個池,鎖(monitor)池和等待池。
鎖池:假設執行緒A已經擁有了某個物件(注意:不是類)的鎖,而其它的執行緒想要呼叫這個物件的某個synchronized方法(或者synchronized塊),由於這些執行緒在進入物件的synchronized方法之前必須先獲得該物件的鎖的擁有權,但是該物件的鎖目前正被執行緒A擁有,所以這些執行緒就進入了該物件的鎖池中。 等待池:假設一個執行緒A呼叫了某個物件的wait()方法,執行緒A就會釋放該物件的鎖(因為wait()方法必須出現在synchronized中,這樣自然在執行wait()方法之前執行緒A就已經擁有了該物件的鎖),同時執行緒A就進入到了該物件的等待池中。如果另外的一個執行緒呼叫了相同物件的notifyAll()方法,那麼處於該物件的等待池中的執行緒就會全部進入該物件的鎖池中,準備爭奪鎖的擁有權。如果另外的一個執行緒呼叫了相同物件的notify()方法,那麼僅僅有一個處於該物件的等待池中的執行緒(隨機)會進入該物件的鎖池。
notify()、notifyAll()、wait()
notify():通知一個在物件上等待的執行緒,由WAITING狀態變為BLOCKING狀態,從等待佇列移動到同步佇列,等待CPU排程獲取該物件的鎖,當該執行緒獲取到了物件的鎖後,該執行緒從wait()方法返回。 notifyAll():通知所有等待在該物件上的執行緒,由WAITING狀態變為BLOCKING狀態,等待CPU排程獲取該物件的鎖。 wait():呼叫該方法的執行緒進入WAITING狀態,並將當前執行緒放置到物件的等待佇列,只有等待另外執行緒的通知或被中斷才會返回,需要注意,呼叫wait()方法後,會釋放物件的鎖。 wait(long):
簡單的例子:
等待通知機制,是指一個執行緒A呼叫了物件lock的wait()方法進入等待狀態,而另一個執行緒B呼叫了物件lock的notify()或者notifyAll()方法,執行緒A收到通知後從物件lock的wait()方法返回,進而執行後續操作。上述兩個執行緒通過物件lock來完成互動,而物件上的wait()和notify/notifyAll()的關係就如同開關訊號一樣,用來完成等待方和通知方之間的互動工作。
public class WaitNotify{
//不需要為volatile,因為對於flag的操作均在synchronized鎖的保護下進行,可以保證flag的記憶體可見性
static boolean flag = true;
static Object lock = new Object();
public static void main(String args[]) throws Exception{
Thread waitThread = new Thread(new Wait(),"WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);//1 second -> package java.util.cocurrent
Thread notifyThread = new Thread(new Notify(),"NotifyThread");
notifyThread.start();
}
static class Wait implements Runnable{
public void run(){
//加鎖,擁有lock的monitor
synchronized(lock){
//當條件不滿足時,繼續wait,同時釋放了lock的鎖
while(flag){
try{
System.out.println("flag is ture, Wait");
lock.wait();
}catch(InterruptedException e){
//除了notify通知,帶超時的wait()方法、執行緒中斷機制也能喚醒此執行緒
}
}
System.out.println("flag is false,complete");
}
}
}
static class Notify implements Runnable{
public void run(){
synchronized(lock){
//獲取lock的鎖,然後進行通知,通知時不會釋放lock的鎖,直到當前執行緒釋放了lock,呼叫了notifyAll,並且WaitThread獲得了鎖之後,wait執行緒才能從wait()方法返回
System.out.println("Notify get lock ,begin notify");
lock.notifyAll();
flag = false;
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized(lock){
System.out.println("Notify get lock again");
}
}
}
}
注意:
- wait()、notify/notifyAll() 方法是Object的本地final方法,無法被重寫。
- wait()使當前執行緒阻塞,前提是 必須先獲得鎖,一般配合synchronized 關鍵字使用,即,一般在synchronized 同步程式碼塊裡使用 wait()、notify/notifyAll() 方法。
- notify/notifyAll() 的執行只是喚醒沉睡的執行緒,而不會立即釋放鎖,鎖的釋放要看程式碼塊的具體執行情況。所以在程式設計中,儘量在使用了notify/notifyAll() 後立即退出臨界區,以喚醒其他執行緒 。
- wait() 需要被try catch包圍,中斷也可以使wait等待的執行緒喚醒。
- notify 和wait 的順序不能錯,如果A執行緒先執行notify方法,B執行緒在執行wait方法,那麼B執行緒是無法被喚醒的。
- notify方法只喚醒一個等待(物件的)執行緒並使該執行緒開始執行。所以如果有多個執行緒等待一個物件,這個方法只會喚醒其中一個執行緒,選擇哪個執行緒取決於作業系統對多執行緒管理的實現。notifyAll 會喚醒所有等待(物件的)執行緒,儘管哪一個執行緒將會第一個處理取決於作業系統的實現。如果當前情況下有多個執行緒需要被喚醒,推薦使用notifyAll 方法。
- 多執行緒中要測試某個條件的變化,要使用while。只有當前值滿足需要值的時候,執行緒才可以往下執行,所以,必須使用while 迴圈阻塞。注意,wait() 當被喚醒時候,只是讓while迴圈繼續往下走.如果此處用if的話,意味著if繼續往下走,會跳出if語句塊。但是,notifyAll 只是負責喚醒執行緒,並不保證條件,所以需要手動來保證程式的邏輯。
生產者和消費者
什麼是生產者-消費者問題呢?假設有一個公共的容量有限的池子,有兩種人,一種是生產者,另一種是消費者。需要滿足如下條件: 1、生產者產生資源往池子裡新增,前提是池子沒有滿,如果池子滿了,則生產者暫停生產,直到自己的生成能放下池子。 2、消費者消耗池子裡的資源,前提是池子的資源不為空,否則消費者暫停消耗,進入等待直到池子裡有資源數滿足自己的需求。
功能實現
- 共享倉庫介面
public interface AbstractStorage {
void consume(int num);
void produce(int num);
}
- 共享倉庫物件
public class Storage1 implements AbstractStorage {
//倉庫最大容量
private final int MAX_SIZE = 100;
//倉庫儲存的載體
private LinkedList list = new LinkedList();
//生產產品
public void produce(int num){
//同步
synchronized (list){
//倉庫剩餘的容量不足以存放即將要生產的數量,暫停生產
while(list.size()+num > MAX_SIZE){
System.out.println("【要生產的產品數量】:" + num + "\t【庫存量】:"
+ list.size() + "\t暫時不能執行生產任務!");
try {
//條件不滿足,生產阻塞
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for(int i=0;i<num;i++){
list.add(new Object());
}
System.out.println("【已經生產產品數】:" + num + "\t【現倉儲量為】:" + list.size());
list.notifyAll();
}
}
//消費產品
public void consume(int num){
synchronized (list){
//不滿足消費條件
while(num > list.size()){
System.out.println("【要消費的產品數量】:" + num + "\t【庫存量】:"
+ list.size() + "\t暫時不能執行生產任務!");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消費條件滿足,開始消費
for(int i=0;i<num;i++){
list.remove();
}
System.out.println("【已經消費產品數】:" + num + "\t【現倉儲量為】:" + list.size());
list.notifyAll();
}
}
}
- 生成者
public class Producer extends Thread{
//每次生產的數量
private int num ;
//所屬的倉庫
public AbstractStorage abstractStorage;
public Producer(AbstractStorage abstractStorage){
this.abstractStorage = abstractStorage;
}
public void setNum(int num){
this.num = num;
}
// 執行緒run函式
@Override
public void run() {
produce(num);
}
// 呼叫倉庫Storage的生產函式
public void produce(int num) {
abstractStorage.produce(num);
}
}
- 消費者
public class Consumer extends Thread{
// 每次消費的產品數量
private int num;
// 所在放置的倉庫
private AbstractStorage abstractStorage1;
// 建構函式,設定倉庫
public Consumer(AbstractStorage abstractStorage1) {
this.abstractStorage1 = abstractStorage1;
}
// 執行緒run函式
public void run() {
consume(num);
}
// 呼叫倉庫Storage的生產函式
public void consume(int num) {
abstractStorage1.consume(num);
}
public void setNum(int num){
this.num = num;
}
}
- 測試類
public class Test{
public static void main(String[] args) {
// 倉庫物件
AbstractStorage abstractStorage = new Storage1();
// 生產者物件
Producer p1 = new Producer(abstractStorage);
Producer p2 = new Producer(abstractStorage);
Producer p3 = new Producer(abstractStorage);
Producer p4 = new Producer(abstractStorage);
Producer p5 = new Producer(abstractStorage);
Producer p6 = new Producer(abstractStorage);
Producer p7 = new Producer(abstractStorage);
// 消費者物件
Consumer c1 = new Consumer(abstractStorage);
Consumer c2 = new Consumer(abstractStorage);
Consumer c3 = new Consumer(abstractStorage);
// 設定生產者產品生產數量
p1.setNum(10);
p2.setNum(10);
p3.setNum(10);
p4.setNum(10);
p5.setNum(10);
p6.setNum(10);
p7.setNum(80);
// 設定消費者產品消費數量
c1.setNum(50);
c2.setNum(20);
c3.setNum(30);
// 執行緒開始執行
c1.start();
c2.start();
c3.start();
p1.start();
p2.start();
p3.start();
p4.start();
p5.start();
p6.start();
p7.start();
}
}
- 測試結果
【要消費的產品數量】:30 【庫存量】:0 暫時不能執行生產任務!
【已經生產產品數】:10 【現倉儲量為】:10
【要消費的產品數量】:30 【庫存量】:10 暫時不能執行生產任務!
【已經生產產品數】:10 【現倉儲量為】:20
【要消費的產品數量】:30 【庫存量】:20 暫時不能執行生產任務!
【已經生產產品數】:10 【現倉儲量為】:30
【已經消費產品數】:30 【現倉儲量為】:0
【要消費的產品數量】:50 【庫存量】:0 暫時不能執行生產任務!
【要消費的產品數量】:20 【庫存量】:0 暫時不能執行生產任務!
【已經生產產品數】:10 【現倉儲量為】:10
【已經生產產品數】:10 【現倉儲量為】:20
【已經消費產品數】:20 【現倉儲量為】:0
【要消費的產品數量】:50 【庫存量】:0 暫時不能執行生產任務!
【已經生產產品數】:10 【現倉儲量為】:10
【已經生產產品數】:80 【現倉儲量為】:90
【已經消費產品數】:50 【現倉儲量為】:40
join()
Thread類中的join方法的主要作用就是同步,它可以使得執行緒之間的並行執行變為序列執行。
例子:
public class JoinTest {
public static void main(String [] args) throws InterruptedException {
ThreadJoinTest t1 = new ThreadJoinTest("小明");
ThreadJoinTest t2 = new ThreadJoinTest("小東");
t1.start();
/**join的意思是使得放棄當前執行緒的執行,並返回對應的執行緒,例如下面程式碼的意思就是:
程式在main執行緒中呼叫t1執行緒的join方法,則main執行緒放棄cpu控制權,並返回t1執行緒繼續執行直到執行緒t1執行完畢
所以結果是t1執行緒執行完後,才到主執行緒執行,相當於在main執行緒中同步t1執行緒,t1執行完了,main執行緒才有執行的機會
*/
t1.join();
t2.start();
}
}
class ThreadJoinTest extends Thread{
public ThreadJoinTest(String name){
super(name);
}
@Override
public void run(){
for(int i=0;i<1000;i++){
System.out.println(this.getName() + ":" + i);
}
}
}
上面程式結果是先列印完小明執行緒,在列印小東執行緒;
上面註釋也大概說明了join方法的作用:在A執行緒中呼叫了B執行緒的join()方法時,表示只有當B執行緒執行完畢時,A執行緒才能繼續執行。注意,這裡呼叫的join方法是沒有傳參的,join方法其實也可以傳遞一個引數給它的。例如:t1.join(10);:如果A執行緒中掉用B執行緒的join(10),則表示A執行緒會等待B執行緒執行10毫秒,10毫秒過後,A、B執行緒並行執行。需要注意的是,jdk規定,join(0)的意思不是A執行緒等待B執行緒0秒,而是A執行緒等待B執行緒無限時間,直到B執行緒執行完畢,即join(0)等價於join()。
join方法必須線上程start方法呼叫之後呼叫才有意義。這個也很容易理解:如果一個執行緒都沒有start,那它也就無法同步了。
原始碼:
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
我們可以看到原始碼使用了wait()方法。(注意這個join方法本身就是synchronized),這樣當前執行緒即處於等待狀態,必須執行notify()或notifyAll()才能喚醒,但實際工作上執行完run方法後,並不需要執行notify(),但後繼程式碼也會被喚醒並執行了,這是什麼原因呢?通過對Jvm natvie的原始碼分析,我們發現thread執行完成後,cpp的原始碼中會在thread執行完畢後,會呼叫exit方法,該方法中原來隱含有呼叫notify_all(thread)的動作。