生產者/消費者模式(阻塞佇列) 一個經典的併發模型
阿新 • • 發佈:2018-11-07
生產消費者模式也是關於執行緒阻塞的問題,生產消費者模式是通過觀察者模式來實現的。之前在編寫一個通訊軟體的時候用到了這種模式,通過維護一個BlockingQueue來完成Socket的訊息傳送,後來讀書時看到在伺服器開發時三層模型中的Service層在呼叫Dao層的時候也是通過這種模式來呼叫的,具體怎麼使用的還沒有具體實踐過,期待後面可以有機會練習這一塊。
實際的軟體開發過程中,經常會碰到如下場景:某個模組負責產生資料,這些資料由另一個模組來負責處理(此處的模組是廣義的,可以是類、函式、執行緒、程序等)。產生資料的模組,就形象地稱為生產者;而處理資料的模組,就稱為消費者。
單單抽象出生產者和消費者,還夠不上是生產者/消費者模式。該模式還需要有一個緩衝區處於生產者和消費者之間,作為一箇中介。生產者把資料放入緩衝區,而消費者從緩衝區取出資料。
- 解耦
假設生產者和消費者分別是兩個類。如果讓生產者直接呼叫消費者的某個方法,那麼生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的程式碼發生變化,可能會影響到生產者。而如果兩者都依賴於某個緩衝區,兩者之間不直接依賴,耦合也就相應降低了。
- 支援併發(concurrency)
生產者直接呼叫消費者的某個方法,還有另一個弊端。由於函式呼叫是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理資料很慢,生產者就會白白糟蹋大好時光。
使用了生產者/消費者模式之後,生產者和消費者可以是兩個獨立的併發主體(常見併發型別有程序和執行緒兩種,後面的帖子會講兩種併發型別下的應用)。生產者把製造出來的資料往緩衝區一丟,就可以再去生產下一個資料。基本上不用依賴消費者的處理速度。其實當初這個模式,主要就是用來處理併發問題的。
- 支援忙閒不均
緩衝區還有另一個好處。如果製造資料的速度時快時慢,緩衝區的好處就體現出來了。當資料製造快的時候,消費者來不及處理,未處理的資料可以暫時存在緩衝區中。等生產者的製造速度慢下來,消費者再慢慢處理掉。
用了兩種方式實現了一下這個模式
1.方法一:
消費者:
public class TestConsumer implements Runnable {
TestQueue queue;
public TestConsumer() {
// TODO Auto-generated constructor stub
}
public TestConsumer(TestQueue obj) {
this.queue = obj;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
queue.consumer();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
生產者:
public class TestProduct implements Runnable {
TestQueue t;
public TestProduct() {
}
public TestProduct(TestQueue t) {
this.t = t;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
t.product("test" + i);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
佇列:
public class TestQueue {
public static Object signal = new Object();
boolean bFull = false;
private List thingsList = new ArrayList();
/**
* 生產
*
* @param thing
* @throws Exception
*/
public void product(String thing) throws Exception {
synchronized (signal) {
if (!bFull) {
bFull = true;
System.out.println("product");
thingsList.add(thing);
signal.notify(); // 然後通知消費者
}
}
}
/**
* 消費
*
* @return
* @throws Exception
*/
public String consumer() {
synchronized (signal) {
if (!bFull) {
// 佇列為空。等待.....
try {
signal.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} // 進入signal待召佇列,等待生產者的通知
}
bFull = false;
// 讀取buf 共享資源裡面的東西
System.out.println("consume");
signal.notify(); // 然後通知生產者
}
String result = "";
if (thingsList.size() > 0) {
result = thingsList.get(thingsList.size() - 1).toString();
thingsList.remove(thingsList.size() - 1);
}
return result;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
測試程式碼:
public class Application {
@Test
public void test(){
TestQueue queue = new TestQueue();
Thread customer = new Thread(new TestConsumer(queue));
Thread product = new Thread(new TestProduct(queue));
customer.start();
product.start();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
2.方法二:
使用java.util.concurrent.BlockingQueue類來重寫的佇列那個類,使用這個方法比較簡單。直接看JDK提供的demo
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
在JDK1.5以上使用了Lock鎖來實現:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}