Java多執行緒-----實現生產者消費者模式的幾種方式
阿新 • • 發佈:2018-12-22
1 生產者消費者模式概述
生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,
直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。這個阻塞佇列就是用來給生產者和消費者解耦的。
線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,
才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這種生產消費能力不均衡的問題,所以便有了生產者和消費者模式
2 實現生產者消費者模式
產品類
package com.thread.pc.blockingqueue; import java.util.UUID; /** * 產品類 * * @author yyx 2018年12月22日 */ public class Product { private UUID proCode; // 產品唯一編碼public Product(UUID proCode) { super(); this.proCode = proCode; } public UUID getProCode() { return proCode; } public void setProCode(UUID proCode) { this.proCode = proCode; } }
生產者
package com.thread.pc.blockingqueue;/** * 生產者 * @author yyx 2018年12月22日 */ public class Producer implements Runnable { private Warehouse warehouse; public Producer(Warehouse warehouse) { super(); this.warehouse = warehouse; } @Override public void run() { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } warehouse.addProduct(); } } }
消費者
package com.thread.pc.blockingqueue; /** * 消費者 * @author yyx 2018年12月22日 */ public class Consumer implements Runnable { private Warehouse warehouse; public Consumer(Warehouse warehouse) { super(); this.warehouse = warehouse; } @Override public void run() { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } warehouse.removeProduct(); } } }
2.1 使用lock、condition和await、singalAll
倉庫類
package com.thread.pc.lockcondition; import java.util.List; import java.util.UUID; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 倉庫類 * * @author yyx 2018年12月22日 */ public class Warehouse { private final int MAX_SIZE = 10; private List<Product> listProduct; private Lock lock; private Condition producerCondition; private Condition consumerCondition; public Warehouse(List<Product> listProduct, Lock lock, Condition producerCondition, Condition consumerCondition) { super(); this.listProduct = listProduct; this.lock = lock; this.producerCondition = producerCondition; this.consumerCondition = consumerCondition; } public void addProduct() { lock.lock(); try { String currentName = Thread.currentThread().getName(); if (listProduct.size() >= MAX_SIZE) { try { System.out.println("產品列表已滿,不再生產!" + currentName + "進入等待"); producerCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } else { Product product = new Product(UUID.randomUUID()); System.out.println(currentName + "生產了一個產品,它的編號是:" + product.getProCode().toString()); listProduct.add(product); consumerCondition.signalAll(); } } finally { lock.unlock(); } } public void removeProduct() { lock.lock(); try { String currentName = Thread.currentThread().getName(); if (listProduct.size() <= 0) { try { System.out.println("產品列表不足,不再消費!" + currentName + "進入等待"); consumerCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } else { Product product = listProduct.get(0); System.out.println(currentName + "消費了一個產品,它的編號是:" + product.getProCode().toString()); listProduct.remove(0); producerCondition.signalAll(); } } finally { lock.unlock(); } } }
測試類
package com.thread.pc.lockcondition; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 測試類 * @author 2018年12月22日 */ public class TestModel { public static void main(String[] args) { List<Product> listProduct=new ArrayList<Product>(); Lock lock = new ReentrantLock(); Condition producerCondition = lock.newCondition(); Condition consumerCondition = lock.newCondition(); Warehouse warehouse = new Warehouse(listProduct,lock, producerCondition, consumerCondition); Producer producer = new Producer(warehouse); Consumer consumer = new Consumer(warehouse); new Thread(producer).start(); new Thread(producer).start(); new Thread(consumer).start(); } }
2.2 使用synchronized修飾方法
倉庫類
package com.thread.pc.synchronizedmethod; import java.util.List; import java.util.UUID; /** * 倉庫類 * * @author yyx 2018年12月22日 */ public class Warehouse { private final int MAX_SIZE = 10; private List<Product> listProduct; public Warehouse(List<Product> listProduct) { super(); this.listProduct = listProduct; } public void addProduct() { synchronized (listProduct) { String currentName = Thread.currentThread().getName(); if (listProduct.size() >= MAX_SIZE) { try { System.out.println("產品列表已滿,不再生產!" + currentName + "進入等待"); listProduct.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { Product product = new Product(UUID.randomUUID()); System.out.println(currentName + "生產了一個產品,它的編號是:" + product.getProCode().toString()); listProduct.add(product); listProduct.notifyAll(); } } } public void removeProduct() { synchronized (listProduct) { String currentName = Thread.currentThread().getName(); if (listProduct.size() <= 0) { try { System.out.println("產品列表不足,不再消費!" + currentName + "進入等待"); listProduct.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { Product product = listProduct.get(0); System.out.println(currentName + "消費了一個產品,它的編號是:" + product.getProCode().toString()); listProduct.remove(0); listProduct.notifyAll(); } } } }
測試類
package com.thread.pc.synchronizedmethod; import java.util.ArrayList; import java.util.List; /** * 測試類 * * @author yyx 2018年12月22日 */ public class TestModel { public static void main(String[] args) { List<Product> listProduct = new ArrayList<Product>(); Warehouse warehouse = new Warehouse(listProduct); Producer producer = new Producer(warehouse); Consumer consumer = new Consumer(warehouse); new Thread(producer).start(); new Thread(producer).start(); new Thread(consumer).start(); new Thread(consumer).start(); } }
2.3 使用synchronized修飾程式碼塊
倉庫類
package com.thread.pc.synchronizedcodeblock; import java.util.List; import java.util.UUID; /** * 倉庫類 * @author yyx 2018年12月22日 */ public class Warehouse { private final int MAX_SIZE = 10; private List<Product> listProduct; public Warehouse(List<Product> listProduct) { super(); this.listProduct = listProduct; } public synchronized void addProduct() { String currentName = Thread.currentThread().getName(); if (listProduct.size() >= MAX_SIZE) { try { System.out.println("產品列表已滿,不再生產!" + currentName + "進入等待"); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { Product product = new Product(UUID.randomUUID()); System.out.println(currentName + "生產了一個產品,它的編號是:" + product.getProCode().toString()); listProduct.add(product); notifyAll(); } } public synchronized void removeProduct() { String currentName = Thread.currentThread().getName(); if (listProduct.size() <= 0) { try { System.out.println("產品列表不足,不再消費!" + currentName + "進入等待"); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { Product product = listProduct.get(0); System.out.println(currentName + "消費了一個產品,它的編號是:" + product.getProCode().toString()); listProduct.remove(0); notifyAll(); } } }
測試類
package com.thread.pc.synchronizedcodeblock; import java.util.ArrayList; import java.util.List; /** * 測試類 * @author yyx 2018年12月22日 */ public class TestModel { public static void main(String[] args) { List<Product> listProduct=new ArrayList<Product>(); Warehouse warehouse = new Warehouse(listProduct); Producer producer = new Producer(warehouse); Consumer consumer = new Consumer(warehouse); new Thread(producer).start(); new Thread(producer).start(); new Thread(consumer).start(); } }
2.4 使用BlockingQueue
倉庫類
package com.thread.pc.blockingqueue; import java.util.UUID; import java.util.concurrent.BlockingQueue; /** * 倉庫 * @author yyx 2018年12月22日 */ public class Warehouse { private final int MAX_SIZE = 10; private BlockingQueue<Product> blockingQueue; public Warehouse(BlockingQueue<Product> blockingQueue) { super(); this.blockingQueue = blockingQueue; } public void addProduct() { String currentName = Thread.currentThread().getName(); if (blockingQueue.size() >= MAX_SIZE) { System.out.println("產品列表已滿,不再生產!" + currentName + "進入等待"); } else { Product product = new Product(UUID.randomUUID()); System.out.println(currentName + "生產了一個產品,它的編號是:" + product.getProCode().toString()); try { blockingQueue.put(product); } catch (InterruptedException e) { e.printStackTrace(); } } } public void removeProduct() { String currentName = Thread.currentThread().getName(); if (blockingQueue.size() <= 0) { System.out.println("產品列表不足,不再消費!" + currentName + "進入等待"); } else { try { Product product = blockingQueue.take(); System.out.println(currentName + "消費了一個產品,它的編號是:" + product.getProCode().toString()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
測試類
package com.thread.pc.blockingqueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * 測試類 * @author yyx 2018年12月22日 */ public class TestModel { public static void main(String[] args) { BlockingQueue<Product> blockingQueue = new LinkedBlockingQueue<>(10); Warehouse warehouse = new Warehouse(blockingQueue); Producer producer = new Producer(warehouse); Consumer consumer = new Consumer(warehouse); new Thread(producer).start(); new Thread(producer).start(); new Thread(consumer).start(); } }