生產者-消費者模式 資料共享通道:BlockingQueue
生產者-消費者模式是一個經典的多執行緒設計模式,它為多執行緒間的協作提供了良好的解決方案。在生產者-消費者模式中,通常有兩類執行緒,即若干個生產者執行緒和若干個消費者執行緒。生產者執行緒負責提交使用者請求,消費者執行緒負責具體處理生產者提交的任務。生產者和消費者之間通過共享記憶體緩衝區進行通訊。
生產者-消費者模式的核心元件是共享記憶體緩衝區,它作為生產者和消費者時間通訊的橋樑,避免了生產者和消費者之間直接通訊,從而將生產者和消費者時間進行解耦。生產者不需要知道消費者的存在,消費者也不需要知道生產者的存在。同時由於緩衝區的存在,允許生產者和消費者在執行速度上存在時間差,無論是生產者在某一區域性時間內速度高於消費者,還是消費者在某一區域性時間內速度高於生產者,都可以通過共享記憶體緩衝區得到緩解,確保系統正常執行。
生產者-消費者模式的主要角色輸入下表所示:
生產者 | 用於提交使用者請求,提取使用者任務,並裝入記憶體緩衝區 |
消費者 | 在記憶體緩衝區中提取並處理任務 |
記憶體緩衝區 | 快取生產者提交的任務或者資料,供消費者使用 |
任務 | 生產者向記憶體緩衝區提交的資料結構 |
Main | 使用生產者和消費者的客戶端 |
下圖顯示了生產者-消費者模式一種實現的具體結構:
其中,BlockingQueue充當了共享記憶體緩衝區,用於維護任務或資料佇列(PCData物件)。PCData物件表示一個生產任務或者相關任務的資料。生產者物件和消費者物件均引用同一個BlockingQueue物件。生產者負責建立PCData,並將它加入到BlockingQueue物件中,消費者則從同一個BlockingQueue中獲取PCData,並執行完該任務。
下面程式碼實現了基於生產者-消費者模式的求整數平方的並行程式。
1 public class PCData { 2 private final int intData;//資料 3 4 public PCData(int intData){ 5 this.intData = intData; 6 } 7 8 public PCData(String d){ 9 intData = Integer.valueOf(d); 10 } 11 12 public int getIntData(){ 13 return intData; 14 }15 16 @Override 17 public String toString(){ 18 return "data:" + intData; 19 } 20 }
1 public class Producer implements Runnable { 2 3 private volatile boolean isRunning = true; 4 //記憶體緩衝區 5 private BlockingQueue<PCData> queue; 6 //總數,原子操作 7 private static AtomicInteger count = new AtomicInteger(); 8 private static final int SLEEP_TIME = 1000; 9 10 public Producer(BlockingQueue<PCData> queue){ 11 this.queue = queue; 12 } 13 14 @Override 15 public void run() { 16 PCData data = null; 17 Random random = new Random(); 18 19 System.out.println("start producer id = " + Thread.currentThread().getId()); 20 while (isRunning){ 21 try { 22 Thread.sleep(random.nextInt(SLEEP_TIME)); 23 data = new PCData(count.incrementAndGet());//構造任務資料 24 System.out.println(data + " is put into queue!"); 25 if (!queue.offer(data,2, TimeUnit.SECONDS)){//提交資料到緩衝區,offer(),當佇列滿時,直接返回false。 26 System.out.println("failed to put data:" + data); 27 } 28 } catch (InterruptedException e) { 29 e.printStackTrace(); 30 Thread.currentThread().interrupt(); 31 } 32 } 33 } 34 35 public void stop(){ 36 isRunning = false; 37 } 38 }
public class Customer implements Runnable { private BlockingQueue<PCData> queue;//緩衝區 private static final int SLEEP_TIME = 1000; public Customer(BlockingQueue<PCData> queue){ this.queue = queue; } @Override public void run() { System.out.println("start customer id = " + Thread.currentThread().getId()); Random random = new Random(); while (true){ try { PCData data = queue.poll();//提取資料 if (null != data){ int re = data.getIntData() * data.getIntData();//計算平方 System.out.println(MessageFormat.format("{0}*{1}={2}",data.getIntData(),data.getIntData(),re)); Thread.sleep(random.nextInt(SLEEP_TIME)); } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } } }
public class Client{ //測試 public static void main(String[] args) throws InterruptedException { //建立緩衝區 BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10); Producer producer1 = new Producer(queue);//生產者 Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Customer customer1 = new Customer(queue);//消費者 Customer customer2 = new Customer(queue); Customer customer3 = new Customer(queue); ExecutorService es = Executors.newCachedThreadPool(); es.execute(producer1);//執行生產者 es.execute(producer2); es.execute(producer3); es.execute(customer1);//執行消費者 es.execute(customer2); es.execute(customer3); Thread.sleep(1000); producer1.stop();//停止生產者 producer2.stop(); producer3.stop(); Thread.sleep(3000); es.shutdown(); } }
輸出結果:
start producer id = 11 start customer id = 14 start customer id = 15 start producer id = 12 start customer id = 16 start producer id = 13 data:1 is put into queue! 1*1=1 data:2 is put into queue! 2*2=4 data:3 is put into queue! 3*3=9 data:4 is put into queue! 4*4=16 data:5 is put into queue! 5*5=25 data:6 is put into queue! 6*6=36 data:7 is put into queue! 7*7=49 data:8 is put into queue! 8*8=64 data:9 is put into queue! 9*9=81
上述程式碼很簡單,看過文章資料共享通道:BlockingQueue後,註釋也比較詳細,就不再贅述程式碼的意思了。
總結:
生產者-消費者模式很好的對生產者和消費者進行解耦,優化了系統整體結構。同時,由於緩衝區的作用,允許生產者執行緒和消費者執行緒存在執行上的效能差異,從一定的程度上緩解了效能瓶頸對系統性能的影響。