1. 程式人生 > 程式設計 >Java多執行緒併發生產者消費者設計模式例項解析

Java多執行緒併發生產者消費者設計模式例項解析

一、兩個執行緒一個生產者一個消費者

需求情景

兩個執行緒,一個負責生產,一個負責消費,生產者生產一個,消費者消費一個。

涉及問題

  • 同步問題:如何保證同一資源被多個執行緒併發訪問時的完整性。常用的同步方法是採用標記或加鎖機制。
  • wait() / nofity() 方法是基類Object的兩個方法,也就意味著所有Java類都會擁有這兩個方法,這樣,我們就可以為任何物件實現同步機制。
  • wait()方法:當緩衝區已滿/空時,生產者/消費者執行緒停止自己的執行,放棄鎖,使自己處於等待狀態,讓其他執行緒執行。
  • notify()方法:當生產者/消費者向緩衝區放入/取出一個產品時,向其他等待的執行緒發出可執行的通知,同時放棄鎖,使自己處於等待狀態。

程式碼實現(共三個類和一個main方法的測試類)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*資源序號*/
  private int number = 0;
  /*資源標記*/
  private boolean flag = false;

  /**
   * 生產資源
   */
  public synchronized void create() {
    if (flag) {//先判斷標記是否已經生產了,如果已經生產,等待消費;
      try {
        wait();//讓生產執行緒等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生產一個
    System.out.println(Thread.currentThread().getName() + "生產者------------" + number);
    flag = true;//將資源標記為已經生產
    notify();//喚醒在等待操作資源的執行緒(佇列)
  }

  /**
   * 消費資源
   */
  public synchronized void destroy() {
    if (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消費者****" + number);

    flag = false;
    notify();
  }
}

Producer.java

package com.demo.ProducerConsumer;

/**
 * 生產者
 * @author lixiaoxi
 *
 */
public class Producer implements Runnable{

  private Resource resource;

  public Producer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.create();
    }

  }
}

Consumer.java

package com.demo.ProducerConsumer;

/**
 * 消費者
 * @author lixiaoxi
 *
 */
public class Consumer implements Runnable{

  private Resource resource;

  public Consumer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.destroy();
    }

  }
}

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生產者執行緒
    new Thread(new Consumer(resource)).start();//消費者執行緒

  }
}

列印結果:

Java多執行緒併發生產者消費者設計模式例項解析

以上列印結果可以看出沒有任何問題。

二、多個執行緒,多個生產者和多個消費者的問題

需求情景

四個執行緒,兩個個負責生產,兩個個負責消費,生產者生產一個,消費者消費一個。

涉及問題

notifyAll()方法:當生產者/消費者向緩衝區放入/取出一個產品時,向其他等待的所有執行緒發出可執行的通知,同時放棄鎖,使自己處於等待狀態。

再次測試程式碼

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生產者執行緒
    new Thread(new Producer(resource)).start();//生產者執行緒
    new Thread(new Consumer(resource)).start();//消費者執行緒
    new Thread(new Consumer(resource)).start();//消費者執行緒

  }
}

執行結果:

Java多執行緒併發生產者消費者設計模式例項解析

Java多執行緒併發生產者消費者設計模式例項解析

通過以上列印結果發現問題

147生產了一次,消費了兩次。169生產了,而沒有消費。

原因分析

當兩個執行緒同時操作生產者生產或者消費者消費時,如果有生產者或消費者的兩個執行緒都wait()時,再次notify(),由於其中一個執行緒已經改變了標記而另外一個執行緒再次往下直接執行的時候沒有判斷標記而導致的。if判斷標記,只有一次,會導致不該執行的執行緒運行了。出現了資料錯誤的情況。

解決方案

while判斷標記,解決了執行緒獲取執行權後,是否要執行!也就是每次wait()後再notify()時先再次判斷標記。

程式碼改進(Resource中的 if -> while)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*資源序號*/
  private int number = 0;
  /*資源標記*/
  private boolean flag = false;

  /**
   * 生產資源
   */
  public synchronized void create() {
    while (flag) {//先判斷標記是否已經生產了,如果已經生產,等待消費;
      try {
        wait();//讓生產執行緒等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生產一個
    System.out.println(Thread.currentThread().getName() + "生產者------------" + number);
    flag = true;//將資源標記為已經生產
    notify();//喚醒在等待操作資源的執行緒(佇列)
  }

  /**
   * 消費資源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消費者****" + number);

    flag = false;
    notify();
  }
}

執行結果:

Java多執行緒併發生產者消費者設計模式例項解析

再次發現問題

列印到某個值比如生產完187,程式執行卡死了,好像鎖死了一樣。

原因分析

notify:只能喚醒一個執行緒,如果本方喚醒了本方,沒有意義。而且while判斷標記+notify會導致”死鎖”。

解決方案

notifyAll解決了本方執行緒一定會喚醒對方執行緒的問題。

最後程式碼改進(Resource中的 notify() -> notifyAll())

Resource.java

package com.demo.ProducerConsumer;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*資源序號*/
  private int number = 0;
  /*資源標記*/
  private boolean flag = false;

  /**
   * 生產資源
   */
  public synchronized void create() {
    while (flag) {//先判斷標記是否已經生產了,如果已經生產,等待消費;
      try {
        wait();//讓生產執行緒等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生產一個
    System.out.println(Thread.currentThread().getName() + "生產者------------" + number);
    flag = true;//將資源標記為已經生產
    notifyAll();//喚醒在等待操作資源的執行緒(佇列)
  }

  /**
   * 消費資源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消費者****" + number);

    flag = false;
    notifyAll();
  }
}

執行結果:

Java多執行緒併發生產者消費者設計模式例項解析

以上就大功告成了,沒有任何問題。

再來梳理一下整個流程。按照示例,生產者消費者交替執行,每次生產後都有對應的消費者,測試類建立例項,如果是生產者先執行,進入run()方法,進入create()方法,flag預設為false,number+1,生產者生產一個產品,flag置為true,同時呼叫notifyAll()方法,喚醒所有正在等待的執行緒,接下來如果還是生產者執行呢?這是flag為true,進入while迴圈,執行wait()方法,接下來如果是消費者執行的話,呼叫destroy()方法,這時flag為true,消費者購買了一次產品,隨即將flag置為false,並喚醒所有正在等待的執行緒。這就是一次完整的多生產者對應多消費者的問題。

三、使用Lock和Condition來解決生產者消費者問題

上面的程式碼有一個問題,就是我們為了避免所有的執行緒都處於等待的狀態,使用了notifyAll方法來喚醒所有的執行緒,即notifyAll喚醒的是自己方和對方執行緒。如果我需要只是喚醒對方的執行緒,比如:生產者只能喚醒消費者的執行緒,消費者只能喚醒生產者的執行緒。

在jdk1.5當中為我們提供了多執行緒的升級解決方案:

1. 將同步synchronized替換成了Lock操作。

2. 將Object中的wait,notify,notifyAll方法替換成了Condition物件。

3. 可以只喚醒對方的執行緒。

完整程式碼:

Resource1.java

package com.demo.ProducerConsumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource1 {

  /*資源序號*/
  private int number = 0;
  /*資源標記*/
  private boolean flag = false;
  
  private Lock lock = new ReentrantLock();
  //使用lock建立生產者的condition物件
  private Condition condition_pro = lock.newCondition(); 
  //使用lock建立消費者的condition物件
  private Condition condition_con = lock.newCondition(); 


  /**
   * 生產資源
   */
  public void create() throws InterruptedException {
    
    try{
      lock.lock();
      //先判斷標記是否已經生產了,如果已經生產,等待消費
      while(flag){
        //生產者等待
        condition_pro.await();
      }
      //生產一個
      number++;
      System.out.println(Thread.currentThread().getName() + "生產者------------" + number);
      //將資源標記為已經生產
      flag = true;
      //生產者生產完畢後,喚醒消費者的執行緒(注意這裡不是signalAll)
      condition_con.signal();
    }finally{
      lock.unlock();
    }
  }

  /**
   * 消費資源
   */
  public void destroy() throws InterruptedException{

    try{
      lock.lock();
      //先判斷標記是否已經消費了,如果已經消費,等待生產
      while(!flag){
        //消費者等待
        condition_con.await();
      }
      
      System.out.println(Thread.currentThread().getName() + "消費者****" + number);
      //將資源標記為已經消費
      flag = false;
      //消費者消費完畢後,喚醒生產者的執行緒
      condition_pro.signal();
    }finally{
      lock.unlock();
    }
  }
}

Producer1.java

package com.demo.ProducerConsumer;

/**
 * 生產者
 * @author lixiaoxi
 *
 */
public class Producer1 implements Runnable{

  private Resource1 resource;

  public Producer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.create();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  
}

Consumer1.java

package com.demo.ProducerConsumer;

/**
 * 消費者
 * @author lixiaoxi
 *
 */
public class Consumer1 implements Runnable{

  private Resource1 resource;

  public Consumer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.destroy();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  
}

ProducerConsumerTest1.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest1 {

  public static void main(String args[]) {
    Resource1 resource = new Resource1();
    new Thread(new Producer1(resource)).start();//生產者執行緒
    new Thread(new Producer1(resource)).start();//生產者執行緒
    new Thread(new Consumer1(resource)).start();//消費者執行緒
    new Thread(new Consumer1(resource)).start();//消費者執行緒

  }
}

執行結果:

Java多執行緒併發生產者消費者設計模式例項解析

四、總結

1、如果生產者、消費者都是1個,那麼flag標記可以用if判斷。這裡有多個,必須用while判斷。

2、在while判斷的同時,notify函式可能喚醒本類執行緒(如一個消費者喚醒另一個消費者),這會導致所有消費者忙等待,程式無法繼續往下執行。使用notifyAll函式代替notify可以解決這個問題,notifyAll可以保證非本類執行緒被喚醒(消費者執行緒能喚醒生產者執行緒,反之也可以),解決了忙等待問題。

小心假死

生產者/消費者模型最終達到的目的是平衡生產者和消費者的處理能力,達到這個目的的過程中,並不要求只有一個生產者和一個消費者。可以多個生產者對應多個消費者,可以一個生產者對應一個消費者,可以多個生產者對應一個消費者。

假死就發生在上面三種場景下。假死指的是全部執行緒都進入了WAITING狀態,那麼程式就不再執行任何業務功能了,整個專案呈現停滯狀態。

比方說有生產者A和生產者B,緩衝區由於空了,消費者處於WAITING。生產者B處於WAITING,生產者A被消費者通知生產,生產者A生產出來的產品本應該通知消費者,結果通知了生產者B,生產者B被喚醒,發現緩衝區滿了,於是繼續WAITING。至此,兩個生產者執行緒處於WAITING,消費者處於WAITING,系統假死。

上面的分析可以看出,假死出現的原因是因為notify的是同類,所以非單生產者/單消費者的場景,可以採取兩種方法解決這個問題:

(1)synchronized用notifyAll()喚醒所有執行緒、ReentrantLock用signalAll()喚醒所有執行緒。

(2)用ReentrantLock定義兩個Condition,一個表示生產者的Condition,一個表示消費者的Condition,喚醒的時候呼叫相應的Condition的signal()方法就可以了。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。