1. 程式人生 > >生產者/消費者模式(阻塞佇列) 一個經典的併發模型

生產者/消費者模式(阻塞佇列) 一個經典的併發模型

生產消費者模式也是關於執行緒阻塞的問題,生產消費者模式是通過觀察者模式來實現的。之前在編寫一個通訊軟體的時候用到了這種模式,通過維護一個BlockingQueue來完成Socket的訊息傳送,後來讀書時看到在伺服器開發時三層模型中的Service層在呼叫Dao層的時候也是通過這種模式來呼叫的,具體怎麼使用的還沒有具體實踐過,期待後面可以有機會練習這一塊。

實際的軟體開發過程中,經常會碰到如下場景:某個模組負責產生資料,這些資料由另一個模組來負責處理(此處的模組是廣義的,可以是類、函式、執行緒、程序等)。產生資料的模組,就形象地稱為生產者;而處理資料的模組,就稱為消費者。
 
單單抽象出生產者和消費者,還夠不上是生產者/消費者模式。該模式還需要有一個緩衝區處於生產者和消費者之間,作為一箇中介。生產者把資料放入緩衝區,而消費者從緩衝區取出資料。

  • 解耦
      假設生產者和消費者分別是兩個類。如果讓生產者直接呼叫消費者的某個方法,那麼生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的程式碼發生變化,可能會影響到生產者。而如果兩者都依賴於某個緩衝區,兩者之間不直接依賴,耦合也就相應降低了。
      
  • 支援併發(concurrency)
      生產者直接呼叫消費者的某個方法,還有另一個弊端。由於函式呼叫是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理資料很慢,生產者就會白白糟蹋大好時光。
      使用了生產者/消費者模式之後,生產者和消費者可以是兩個獨立的併發主體(常見併發型別有程序和執行緒兩種,後面的帖子會講兩種併發型別下的應用)。生產者把製造出來的資料往緩衝區一丟,就可以再去生產下一個資料。基本上不用依賴消費者的處理速度。其實當初這個模式,主要就是用來處理併發問題的。
      
  • 支援忙閒不均
      緩衝區還有另一個好處。如果製造資料的速度時快時慢,緩衝區的好處就體現出來了。當資料製造快的時候,消費者來不及處理,未處理的資料可以暫時存在緩衝區中。等生產者的製造速度慢下來,消費者再慢慢處理掉。

用了兩種方式實現了一下這個模式

1.方法一:

消費者:

public class TestConsumer implements Runnable {
    TestQueue queue;

    public TestConsumer() {
        // TODO Auto-generated constructor stub
    }

    public
TestConsumer(TestQueue obj) { this.queue = obj; } @Override public void run() { for (int i = 0; i < 10; i++) { try { queue.consumer(); } catch (Exception e) { e.printStackTrace(); } } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

生產者:

public class TestProduct implements Runnable {
    TestQueue t;

    public TestProduct() {

    }

    public TestProduct(TestQueue t) {
        this.t = t;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                t.product("test" + i);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

佇列:

public class TestQueue {
    public static Object signal = new Object();
    boolean bFull = false;
    private List thingsList = new ArrayList();

    /**
     * 生產
     * 
     * @param thing
     * @throws Exception
     */
    public void product(String thing) throws Exception {
        synchronized (signal) {
            if (!bFull) {
                bFull = true;
                System.out.println("product");
                thingsList.add(thing);
                signal.notify(); // 然後通知消費者
            }
        }
    }

    /**
     * 消費
     * 
     * @return
     * @throws Exception
     */
    public String consumer() {
        synchronized (signal) {
            if (!bFull) {
                // 佇列為空。等待.....
                try {
                    signal.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } // 進入signal待召佇列,等待生產者的通知
            }
            bFull = false;
            // 讀取buf 共享資源裡面的東西
            System.out.println("consume");
            signal.notify(); // 然後通知生產者
        }
        String result = "";
        if (thingsList.size() > 0) {
            result = thingsList.get(thingsList.size() - 1).toString();
            thingsList.remove(thingsList.size() - 1);
        }
        return result;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

測試程式碼:

public class Application {
    @Test
    public void test(){
        TestQueue queue = new TestQueue();
        Thread customer = new Thread(new TestConsumer(queue));
     Thread product = new Thread(new TestProduct(queue));
     customer.start();
     product.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2.方法二:

使用java.util.concurrent.BlockingQueue類來重寫的佇列那個類,使用這個方法比較簡單。直接看JDK提供的demo

class Producer implements Runnable {  
   private final BlockingQueue queue;  
   Producer(BlockingQueue q) { queue = q; }  
    public void run() {  
      try {  
        while(true) { queue.put(produce()); }  
      } catch (InterruptedException ex) { ... handle ...}  
    }  
    Object produce() { ... }  
  }  

  class Consumer implements Runnable {  
    private final BlockingQueue queue;  
    Consumer(BlockingQueue q) { queue = q; }  
    public void run() {  
      try {  
        while(true) { consume(queue.take()); }  
      } catch (InterruptedException ex) { ... handle ...}  
    }  
    void consume(Object x) { ... }  
  }  

  class Setup {  
    void main() {  
      BlockingQueue q = new SomeQueueImplementation();  
      Producer p = new Producer(q);  
      Consumer c1 = new Consumer(q);  
      Consumer c2 = new Consumer(q);  
      new Thread(p).start();  
      new Thread(c1).start();  
      new Thread(c2).start();  
    }  
 }  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

在JDK1.5以上使用了Lock鎖來實現:

    class BoundedBuffer {  
      final Lock lock = new ReentrantLock();  
      final Condition notFull  = lock.newCondition();   
      final Condition notEmpty = lock.newCondition();   

      final Object[] items = new Object[100];  
      int putptr, takeptr, count;  

      public void put(Object x) throws InterruptedException {  
        lock.lock();  
        try {  
          while (count == items.length)   
            notFull.await();  
          items[putptr] = x;   
          if (++putptr == items.length) putptr = 0;  
          ++count;  
          notEmpty.signal();  
        } finally {  
          lock.unlock();  
        }  
      }  

      public Object take() throws InterruptedException {  
        lock.lock();  
        try {  
          while (count == 0)   
            notEmpty.await();  
          Object x = items[takeptr];   
          if (++takeptr == items.length) takeptr = 0;  
          --count;  
          notFull.signal();  
          return x;  
        } finally {  
          lock.unlock();  
        }  
      }   
    }