1. 程式人生 > >生產者-消費者模式 資料共享通道:BlockingQueue

生產者-消費者模式 資料共享通道:BlockingQueue

  生產者-消費者模式是一個經典的多執行緒設計模式,它為多執行緒間的協作提供了良好的解決方案。在生產者-消費者模式中,通常有兩類執行緒,即若干個生產者執行緒和若干個消費者執行緒。生產者執行緒負責提交使用者請求,消費者執行緒負責具體處理生產者提交的任務。生產者和消費者之間通過共享記憶體緩衝區進行通訊。

  生產者-消費者模式的核心元件是共享記憶體緩衝區,它作為生產者和消費者時間通訊的橋樑,避免了生產者和消費者之間直接通訊,從而將生產者和消費者時間進行解耦。生產者不需要知道消費者的存在,消費者也不需要知道生產者的存在。同時由於緩衝區的存在,允許生產者和消費者在執行速度上存在時間差,無論是生產者在某一區域性時間內速度高於消費者,還是消費者在某一區域性時間內速度高於生產者,都可以通過共享記憶體緩衝區得到緩解,確保系統正常執行。

  生產者-消費者模式的主要角色輸入下表所示:

生產者-消費者模式主要角色
生產者 用於提交使用者請求,提取使用者任務,並裝入記憶體緩衝區
消費者 在記憶體緩衝區中提取並處理任務
記憶體緩衝區 快取生產者提交的任務或者資料,供消費者使用
任務 生產者向記憶體緩衝區提交的資料結構
Main 使用生產者和消費者的客戶端

下圖顯示了生產者-消費者模式一種實現的具體結構:

 

  其中,BlockingQueue充當了共享記憶體緩衝區,用於維護任務或資料佇列(PCData物件)。PCData物件表示一個生產任務或者相關任務的資料。生產者物件和消費者物件均引用同一個BlockingQueue物件。生產者負責建立PCData,並將它加入到BlockingQueue物件中,消費者則從同一個BlockingQueue中獲取PCData,並執行完該任務。
下面程式碼實現了基於生產者-消費者模式的求整數平方的並行程式。

 1 public class PCData {
 2     private final int intData;//資料
 3 
 4     public PCData(int intData){
 5         this.intData = intData;
 6     }
 7 
 8     public PCData(String d){
 9         intData = Integer.valueOf(d);
10     }
11 
12     public int getIntData(){
13         return intData;
14     }
15 16 @Override 17 public String toString(){ 18 return "data:" + intData; 19 } 20 }
 1 public class Producer implements Runnable {
 2 
 3     private volatile boolean isRunning = true;
 4     //記憶體緩衝區
 5     private BlockingQueue<PCData> queue;
 6     //總數,原子操作
 7     private static AtomicInteger count = new AtomicInteger();
 8     private static final int SLEEP_TIME = 1000;
 9 
10     public Producer(BlockingQueue<PCData> queue){
11         this.queue = queue;
12     }
13 
14     @Override
15     public void run() {
16         PCData data = null;
17         Random random = new Random();
18 
19         System.out.println("start producer id = " + Thread.currentThread().getId());
20         while (isRunning){
21             try {
22                 Thread.sleep(random.nextInt(SLEEP_TIME));
23                 data = new PCData(count.incrementAndGet());//構造任務資料
24                 System.out.println(data + " is put into queue!");
25                 if (!queue.offer(data,2, TimeUnit.SECONDS)){//提交資料到緩衝區,offer(),當佇列滿時,直接返回false。
26                     System.out.println("failed to put data:" + data);
27                 }
28             } catch (InterruptedException e) {
29                 e.printStackTrace();
30                 Thread.currentThread().interrupt();
31             }
32         }
33     }
34 
35     public void stop(){
36         isRunning = false;
37     }
38 }
public class Customer implements Runnable {

    private BlockingQueue<PCData> queue;//緩衝區
    private static final int SLEEP_TIME = 1000;

    public Customer(BlockingQueue<PCData> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("start customer id = " + Thread.currentThread().getId());
        Random random = new Random();

        while (true){
            try {
                PCData data = queue.poll();//提取資料
                if (null != data){
                    int re = data.getIntData() * data.getIntData();//計算平方
                    System.out.println(MessageFormat.format("{0}*{1}={2}",data.getIntData(),data.getIntData(),re));
                    Thread.sleep(random.nextInt(SLEEP_TIME));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }
}
public class Client{
    //測試
    public static void main(String[] args) throws InterruptedException {
        //建立緩衝區
        BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
        Producer producer1 = new Producer(queue);//生產者
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Customer customer1 = new Customer(queue);//消費者
        Customer customer2 = new Customer(queue);
        Customer customer3 = new Customer(queue);

        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(producer1);//執行生產者
        es.execute(producer2);
        es.execute(producer3);
        es.execute(customer1);//執行消費者
        es.execute(customer2);
        es.execute(customer3);

        Thread.sleep(1000);
        producer1.stop();//停止生產者
        producer2.stop();
        producer3.stop();
        Thread.sleep(3000);
        es.shutdown();
    }
}

輸出結果:

start producer id = 11
start customer id = 14
start customer id = 15
start producer id = 12
start customer id = 16
start producer id = 13
data:1 is put into queue!
1*1=1
data:2 is put into queue!
2*2=4
data:3 is put into queue!
3*3=9
data:4 is put into queue!
4*4=16
data:5 is put into queue!
5*5=25
data:6 is put into queue!
6*6=36
data:7 is put into queue!
7*7=49
data:8 is put into queue!
8*8=64
data:9 is put into queue!
9*9=81

上述程式碼很簡單,看過文章資料共享通道:BlockingQueue後,註釋也比較詳細,就不再贅述程式碼的意思了。

總結:
  生產者-消費者模式很好的對生產者和消費者進行解耦,優化了系統整體結構。同時,由於緩衝區的作用,允許生產者執行緒和消費者執行緒存在執行上的效能差異,從一定的程度上緩解了效能瓶頸對系統性能的影響。