生產者/消費者模式實現
wait/notify最經典的案例就是"生產者/消費者"模式。但是此模式有一些需要注意的地方。
1. 一個簡單的生產者消費者
一個執行緒向集合中新增元素,兩個集合從集合中刪除元素,與之前等待/通知部落格的最後一個案例類似。
package cn.qlq.thread.seven; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 利用之前的等待/通知實現一個簡單的生產者消費者 * *@author Administrator * */ public class Demo1 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo1.class); public static void main(String[] args) throws InterruptedException { final List<String> list = new ArrayList<String>(); // 刪除元素執行緒1Thread sub1 = new Thread(new Runnable() { @Override public void run() { try { synchronized (list) { while (true) { while (list.size() == 0) { list.wait(); } LOGGER.info("list.remove ->{}, threadName->{}", list.get(0), Thread.currentThread().getName()); list.remove(0); list.notifyAll(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }, "sub1"); sub1.start(); // 刪除元素執行緒2 Thread sub2 = new Thread(new Runnable() { @Override public void run() { try { synchronized (list) { while (true) { while (list.size() == 0) { list.wait(); } LOGGER.info("list.remove ->{}, threadName->{}", list.get(0), Thread.currentThread().getName()); list.remove(0); list.notifyAll(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }, "sub2"); sub2.start(); // 增加元素執行緒 Thread.sleep(1 * 1000); Thread addThread = new Thread(new Runnable() { @Override public void run() { try { for (int i = 0; i < 5; i++) { synchronized (list) { list.add(i + ""); LOGGER.info("新增元素->{},threadName->{}", i, Thread.currentThread().getName()); list.notifyAll(); list.wait(); } } } catch (InterruptedException e) { LOGGER.error("InterruptedException error", e); } } }, "B"); addThread.start(); } }
結果:
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 新增元素->0,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->0, threadName->sub2
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 新增元素->1,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->1, threadName->sub1
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 新增元素->2,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->2, threadName->sub2
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 新增元素->3,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->3, threadName->sub1
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 新增元素->4,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->4, threadName->sub2
2. 多生產與多消費:操作值-假死
假死的現象其實就是進入waiting狀態。如果全部執行緒都進入waiting狀態,則程式就不再執行任何業務功能了,整個專案呈停止狀態。
例如兩個生產者兩個消費者最後處於假死狀態的例子:
package cn.qlq.thread.seven; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.util.logging.resources.logging; /** * 多生產與多消費:操作值-假死( 多生產與多消費保證只有一個元素生產與消費) * * @author Administrator * */ public class Demo2 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class); public static void main(String[] args) throws InterruptedException { final List<String> list = new ArrayList<String>(); // 刪除元素執行緒1 Thread sub1 = new Thread(new Runnable() { @Override public void run() { try { synchronized (list) { while (true) { while (list.size() == 0) { LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName()); list.wait(); LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName()); } LOGGER.info("list.remove ->{}, threadName->{}", list.get(0), Thread.currentThread().getName()); list.remove(0); list.notify(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }, "sub1"); sub1.start(); // 刪除元素執行緒2 Thread sub2 = new Thread(new Runnable() { @Override public void run() { try { synchronized (list) { while (true) { while (list.size() == 0) { LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName()); list.wait(); LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName()); } LOGGER.info("list.remove ->{}, threadName->{}", list.get(0), Thread.currentThread().getName()); list.remove(0); list.notify(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }, "sub2"); sub2.start(); // 增加元素執行緒 Thread.sleep(1 * 1000); Thread addThread = new Thread(new Runnable() { @Override public void run() { try { for (int i = 0; i < 50000; i++) { synchronized (list) { while (list.size() != 0) { LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName()); list.wait(); LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName()); } list.add(i + ""); LOGGER.info("新增元素->{},threadName->{}", i, Thread.currentThread().getName()); list.notify(); } } } catch (InterruptedException e) { LOGGER.error("InterruptedException error", e); } } }, "add1"); addThread.start(); // 增加元素執行緒 Thread.sleep(1 * 1000); Thread addThread2 = new Thread(new Runnable() { @Override public void run() { try { for (int i = 0; i < 50000; i++) { synchronized (list) { while (list.size() != 0) { LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName()); list.wait(); LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName()); } list.add(i + ""); LOGGER.info("新增元素->{},threadName->{}", Thread.currentThread().getName()); list.notify(); } } } catch (InterruptedException e) { LOGGER.error("InterruptedException error", e); } } }, "add2"); addThread2.start(); Thread.sleep(10 * 1000); LOGGER.info("sub1 state->{}", sub1.getState()); LOGGER.info("sub2 state->{}", sub2.getState()); LOGGER.info("add1 state->{}", addThread.getState()); LOGGER.info("add2 state->{}", addThread2.getState()); } }
結果:
18:49:23 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub1
18:49:23 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub2
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 新增元素->0,threadName->add1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->add1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] list.remove ->0, threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->sub2
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub2
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 新增元素->add2,threadName->{}
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->add1
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->add1
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->add2
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] sub1 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] sub2 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] add1 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] add2 state->WAITING
解釋一下上面的執行緒假死的原因:
由於喚醒執行緒呼叫的是notify()喚醒單個執行緒,所以有可能喚醒的是同類的執行緒,也就是生產者喚醒的是生產者,消費者喚醒的是消費者。導致最後四個執行緒都處於waiting狀態。
解決辦法:
喚醒的時候採用notifyAll()喚醒所有的執行緒喚醒所有的執行緒,避免喚醒同類執行緒。