生產者消費者三種併發模式實現方法
阿新 • • 發佈:2019-02-20
package cn.luxh.app.test; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;/** * Lock實現的生產者和消費者 * */ public class ProducerCustomerWithLock { Executor pool = Executors.newFixedThreadPool(10); //倉庫 private List<String> storageList = new LinkedList<String>(); //倉庫容量 private int MAX_SIZE = 3; //倉庫為空 private int ZERO = 0;//獲取鎖物件 private Lock lock = new ReentrantLock(); //倉庫滿了,繫結生產者執行緒 private Condition full = lock.newCondition(); //倉庫為空,繫結消費者執行緒 private Condition empty = lock.newCondition(); //生產者執行緒 private class Producer implements Runnable{ //生產方法,需同步 privatevoid produce(){ if(lock.tryLock()) { System.out.println(Thread.currentThread().getName()+"進入倉庫,準備生產!"); try { if(storageList.size()==MAX_SIZE) { System.out.println("倉庫已滿!等待消費者消費"); Thread.sleep(1000); full.await();//生產者執行緒加入執行緒等待池 } if(storageList.size()<MAX_SIZE){ String name = "產品"+new Random().nextInt(); storageList.add(name); System.out.println(Thread.currentThread().getName()+"往倉庫中生產了一個產品!"); } Thread.sleep(1000); empty.signalAll();//喚醒消費者執行緒 }catch(InterruptedException ie) { System.out.println("中斷異常"); ie.printStackTrace(); }finally{ lock.unlock(); } } } @Override public void run() { while(true) { produce(); } } } //消費者執行緒 private class Customer implements Runnable{ //消費方法,需同步 private void consume() { if(lock.tryLock()) { System.out.println(Thread.currentThread().getName()+"進入倉庫,準備消費!"); try { if(storageList.size()==ZERO) { System.out.println("倉庫已空!等待生產者生產"); Thread.sleep(1000); empty.await();//消費者執行緒加入執行緒等待池 } if(storageList.size()!=ZERO) { System.out.println(Thread.currentThread().getName()+"從倉庫取得產品:"+storageList.remove(0)); } Thread.sleep(1000); full.signalAll();//喚醒生產者執行緒 }catch(InterruptedException ie) { System.out.println("中斷異常"); ie.printStackTrace(); }finally{ lock.unlock(); } } } @Override public void run() { while(true) { consume(); } } } //啟動生產者和消費者執行緒 public void start() { for(int i=1;i<5;i++) { //new Thread(new Producer()).start(); //new Thread(new Customer()).start(); pool.execute(new Producer()); pool.execute(new Customer()); } } public static void main(String[] args) { ProducerCustomerWithLock pc = new ProducerCustomerWithLock(); pc.start(); } }
package cn.luxh.app.test; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** * BlockingQueue實現的生產者和消費者 * */ public class ProducerCustomerWithBlockingQueue { Executor pool = Executors.newFixedThreadPool(10); //倉庫 private BlockingQueue<String> storageQueue = new LinkedBlockingQueue<String>(5); //倉庫容量 private int MAX_SIZE = 3; //倉庫為空 private int ZERO = 0; //生產者執行緒 private class Producer implements Runnable{ //生產方法,需同步 private void produce(){ try { System.out.println(Thread.currentThread().getName()+"進入倉庫,準備生產!"); if(storageQueue.size()==MAX_SIZE) { System.out.println("倉庫已滿!等待消費者消費"); Thread.sleep(1000); } if(storageQueue.size()<=MAX_SIZE) { String product = "產品"+new Random().nextInt(); storageQueue.put(product); System.out.println(Thread.currentThread().getName()+"往倉庫中生產了一個產品!"); } Thread.sleep(1000); }catch(InterruptedException ie) { System.out.println("中斷異常"); ie.printStackTrace(); } } @Override public void run() { while(true) { produce(); } } } //消費者執行緒 private class Customer implements Runnable{ //消費方法,需同步 private void consume() { try { System.out.println(Thread.currentThread().getName()+"進入倉庫,準備消費!"); if(storageQueue.size()==ZERO) { System.out.println("倉庫已空!等待生產者生產"); Thread.sleep(1000); } if(storageQueue.size()!=ZERO) { System.out.println(Thread.currentThread().getName()+"從倉庫取得產品:"+storageQueue.take()); } Thread.sleep(1000); }catch(InterruptedException ie) { System.out.println("中斷異常"); ie.printStackTrace(); } } @Override public void run() { while(true) { consume(); } } } //啟動生產者和消費者執行緒 public void start() { for(int i=1;i<5;i++) { //new Thread(new Producer()).start(); ///new Thread(new Customer()).start(); pool.execute(new Producer()); pool.execute(new Customer()); } } public static void main(String[] args) { ProducerCustomerWithBlockingQueue pc = new ProducerCustomerWithBlockingQueue(); pc.start(); } }