多線程生產者/消費者模式實現
阿新 • • 發佈:2018-09-18
als true not eat bre creat 測試類 void group
參考書籍《java多線程編程核心技術》
都是基於wait/notify實現的
一個生產者和一個消費者:操作值
1 package com.qf.test10.pojo; 2 3 /** 4 * @author qf 5 * @create 2018-09-18 15:59 6 */ 7 public class Entity { 8 public static String value = ""; 9 }
1 package com.qf.test10; 2 3 import com.qf.test10.pojo.Entity; 4 5/** 6 * @author qf 7 * @create 2018-09-18 15:52 8 * 生產者類 9 */ 10 public class Producer { 11 private String lock; 12 13 public Producer(String lock) { 14 this.lock = lock; 15 } 16 17 public void setValue(){ 18 try { 19 synchronized (lock){20 if(!Entity.value.equals("")){ 21 lock.wait(); 22 } 23 String value = System.currentTimeMillis()+"_"+System.nanoTime(); 24 System.out.println("set的值是"+value); 25 Entity.value = value; 26 lock.notify();27 } 28 } catch (InterruptedException e) { 29 e.printStackTrace(); 30 } 31 } 32 }
1 package com.qf.test10; 2 3 import com.qf.test10.pojo.Entity; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 15:52 8 * 消費者類 9 */ 10 public class Consumer { 11 private String lock; 12 13 public Consumer(String lock) { 14 this.lock = lock; 15 } 16 17 public void getValue(){ 18 try { 19 synchronized (lock){ 20 if(Entity.value.equals("")){ 21 lock.wait(); 22 } 23 System.out.println("get的值"+Entity.value); 24 Entity.value = ""; 25 lock.notify(); 26 } 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 } 31 }
線程類
1 package com.qf.test10.thread; 2 3 import com.qf.test10.Producer; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 16:08 8 */ 9 public class ThreadP extends Thread { 10 private Producer producer; 11 12 public ThreadP(Producer producer) { 13 this.producer = producer; 14 } 15 16 @Override 17 public void run() { 18 while(true) { 19 producer.setValue(); 20 } 21 } 22 }
1 package com.qf.test10.thread; 2 3 import com.qf.test10.Consumer; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 16:11 8 */ 9 public class ThreadC extends Thread { 10 private Consumer consumer; 11 12 public ThreadC(Consumer consumer) { 13 this.consumer = consumer; 14 } 15 16 @Override 17 public void run() { 18 while (true) { 19 consumer.getValue(); 20 } 21 } 22 }
測試運行
1 package com.qf.test10; 2 3 import com.qf.test10.thread.ThreadC; 4 import com.qf.test10.thread.ThreadP; 5 6 /** 7 * @author qf 8 * @create 2018-09-18 16:12 9 */ 10 public class Run { 11 public static void main(String[] args) { 12 String lock = new String(""); 13 Producer p = new Producer(lock); 14 Consumer c = new Consumer(lock); 15 ThreadP tp = new ThreadP(p); 16 tp.start(); 17 ThreadC tc = new ThreadC(c); 18 tc.start(); 19 } 20 }
打印輸出
set的值是1537259244097_800479975994656
get的值1537259244097_800479975994656
set的值是1537259244097_800479976020503
get的值1537259244097_800479976020503
set的值是1537259244097_800479976042246
get的值1537259244097_800479976042246
set的值是1537259244097_800479976062349
get的值1537259244097_800479976062349
set的值是1537259244097_800479976083272
get的值1537259244097_800479976083272
set的值是1537259244097_800479976103785
get的值1537259244097_800479976103785
set的值是1537259244097_800479976124298
get的值1537259244097_800479976124298
set的值是1537259244097_800479976144400
get的值1537259244097_800479976144400
.............
如果以此為基礎,設計多個生產者和多個消費者,那麽運行過程中很可能會發生假死的情況,也就是所有線程都呈現等待的狀態
多個生產者與多個消費者:操作值
修改Producer.java,Consumer.java以及測試類
1 package com.qf.test10; 2 3 import com.qf.test10.pojo.Entity; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 15:52 8 * 生產者類 9 */ 10 public class Producer { 11 private String lock; 12 13 public Producer(String lock) { 14 this.lock = lock; 15 } 16 17 public void setValue(){ 18 try { 19 synchronized (lock){ 20 while (!Entity.value.equals("")){ 21 System.out.println("生產者 "+Thread.currentThread().getName()+" WAITING了★"); 22 lock.wait(); 23 } 24 System.out.println("生產者 "+Thread.currentThread().getName()+" RUNNABLE了"); 25 String value = System.currentTimeMillis()+"_"+System.nanoTime(); 26 //System.out.println("set的值是"+value); 27 Entity.value = value; 28 lock.notify(); 29 } 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 } 34 }
1 package com.qf.test10; 2 3 import com.qf.test10.pojo.Entity; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 15:52 8 * 消費者類 9 */ 10 public class Consumer { 11 private String lock; 12 13 public Consumer(String lock) { 14 this.lock = lock; 15 } 16 17 public void getValue(){ 18 try { 19 synchronized (lock){ 20 if(Entity.value.equals("")){ 21 System.out.println("消費者 "+Thread.currentThread().getName()+" WAITING了☆"); 22 lock.wait(); 23 } 24 System.out.println("消費者 "+Thread.currentThread().getName()+" RUNNABLE了"); 25 //System.out.println("get的值"+Entity.value); 26 Entity.value = ""; 27 lock.notify(); 28 } 29 } catch (InterruptedException e) { 30 e.printStackTrace(); 31 } 32 } 33 }
1 package com.qf.test10; 2 3 import com.qf.test10.thread.ThreadC; 4 import com.qf.test10.thread.ThreadP; 5 6 /** 7 * @author qf 8 * @create 2018-09-18 16:12 9 */ 10 public class Run { 11 public static void main(String[] args) throws InterruptedException { 12 String lock = new String(""); 13 Producer p = new Producer(lock); 14 Consumer c = new Consumer(lock); 15 /*ThreadP tp = new ThreadP(p); 16 tp.start(); 17 ThreadC tc = new ThreadC(c); 18 tc.start();*/ 19 ThreadP[] threadPS = new ThreadP[2]; 20 ThreadC[] threadCS = new ThreadC[2]; 21 for (int i = 0; i < 2; i++) { 22 threadPS[i] = new ThreadP(p); 23 threadPS[i].setName("生產者"+(i+1)); 24 threadPS[i].start(); 25 threadCS[i] = new ThreadC(c); 26 threadCS[i].setName("消費者"+(i+1)); 27 threadCS[i].start(); 28 } 29 30 Thread.sleep(5000); 31 Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()]; 32 Thread.currentThread().getThreadGroup().enumerate(threads); 33 for (int i = 0; i < threads.length; i++) { 34 System.out.println(threads[i].getName()+" "+threads[i].getState()); 35 } 36 } 37 }
打印結果
生產者 生產者1 RUNNABLE了 生產者 生產者1 WAITING了★ 生產者 生產者2 WAITING了★ 消費者 消費者1 RUNNABLE了 消費者 消費者1 WAITING了☆ 生產者 生產者1 RUNNABLE了 生產者 生產者1 WAITING了★ 生產者 生產者2 WAITING了★ 消費者 消費者2 RUNNABLE了 消費者 消費者2 WAITING了☆ 消費者 消費者1 RUNNABLE了 消費者 消費者1 WAITING了☆ 生產者 生產者1 RUNNABLE了 生產者 生產者1 WAITING了★ 生產者 生產者2 WAITING了★ main RUNNABLE Monitor Ctrl-Break RUNNABLE 生產者1 WAITING 消費者1 WAITING 生產者2 WAITING 消費者2 WAITING
主要原因是因為notify可能喚醒的是同類(生產者喚醒生產者,消費者喚醒消費者)。最終導致所有線程都處於WAITING狀態,程序進而呈現假死狀態
只要將Producer和Consumer中的notify修改為notifyAll即可,這樣就不至於出現假死狀態
一個生產者和一個消費者:操作棧
1 package com.qf.test11.pojo; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 /** 7 * @author qf 8 * @create 2018-09-18 17:14 9 */ 10 public class MyStack { 11 private List list = new ArrayList(); 12 synchronized public void push(){ 13 try { 14 if (list.size() == 1){ 15 this.wait(); 16 } 17 list.add("test"+Math.random()); 18 this.notify(); 19 System.out.println("push = "+list.size()); 20 } catch (InterruptedException e) { 21 e.printStackTrace(); 22 } 23 } 24 public synchronized void pop(){ 25 try { 26 if(list.size() == 0){ 27 //System.out.println("pop操作: "+Thread.currentThread().getName()+"線程wait狀態"); 28 this.wait(); 29 } 30 System.out.println("pop操作: "+Thread.currentThread().getName()+"線程,獲取值="+list.get(0)); 31 list.remove(0); 32 this.notify(); 33 System.out.println("pop = "+list.size()); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 } 38 }
生產者/消費者
1 package com.qf.test11; 2 3 import com.qf.test11.pojo.MyStack; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 17:13 8 * 生產者 9 */ 10 public class Producer { 11 private MyStack myStack; 12 13 public Producer(MyStack myStack) { 14 this.myStack = myStack; 15 } 16 17 public void pushService(){ 18 myStack.push(); 19 } 20 }
1 package com.qf.test11; 2 3 import com.qf.test11.pojo.MyStack; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 17:14 8 */ 9 public class Consumer { 10 private MyStack myStack; 11 12 public Consumer(MyStack myStack) { 13 this.myStack = myStack; 14 } 15 public void popService(){ 16 myStack.pop(); 17 } 18 }
線程類
1 package com.qf.test11.thread; 2 3 import com.qf.test11.Producer; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 17:13 8 */ 9 public class ThreadP extends Thread { 10 private Producer producer; 11 12 public ThreadP(Producer producer) { 13 this.producer = producer; 14 } 15 16 @Override 17 public void run() { 18 while (true){ 19 producer.pushService(); 20 } 21 } 22 }
1 package com.qf.test11.thread; 2 3 import com.qf.test11.Consumer; 4 5 /** 6 * @author qf 7 * @create 2018-09-18 17:14 8 */ 9 public class ThreadC extends Thread { 10 private Consumer consumer; 11 12 public ThreadC(Consumer consumer) { 13 this.consumer = consumer; 14 } 15 16 @Override 17 public void run() { 18 while (true){ 19 consumer.popService(); 20 } 21 } 22 }
測試運行
1 package com.qf.test11; 2 3 import com.qf.test11.pojo.MyStack; 4 import com.qf.test11.thread.ThreadC; 5 import com.qf.test11.thread.ThreadP; 6 7 /** 8 * @author qf 9 * @create 2018-09-18 17:34 10 */ 11 public class Run { 12 public static void main(String[] args) { 13 MyStack myStack = new MyStack(); 14 Producer p = new Producer(myStack); 15 Consumer c = new Consumer(myStack); 16 ThreadP tp = new ThreadP(p); 17 ThreadC tc = new ThreadC(c); 18 tp.setName("tp"); 19 tc.setName("tc"); 20 tp.start(); 21 tc.start(); 22 } 23 }
打印結果
push = 1 pop操作: tc線程,獲取值=test0.8957260024057878 pop = 0 push = 1 pop操作: tc線程,獲取值=test0.9236606274738514 pop = 0 push = 1 pop操作: tc線程,獲取值=test0.7661156573296891 pop = 0 push = 1 pop操作: tc線程,獲取值=test0.6523634151650343 pop = 0 push = 1 pop操作: tc線程,獲取值=test0.08728918553111287 pop = 0 push = 1 pop操作: tc線程,獲取值=test0.472483808512989 pop = 0 push = 1 pop操作: tc線程,獲取值=test0.17456918848050884 pop = 0 push = 1 pop操作: tc線程,獲取值=test0.1785536700399648 pop = 0 ............
多線程生產者/消費者模式實現