Producer-Consumer模型:二、如何實現
阿新 • • 發佈:2018-11-28
Producer-Consumer模型的實現主要考慮一下幾個方便:
- 生成者的實現
- 消費者的實現
- 訊息佇列
- 執行緒的通訊、等待、同步
- 執行緒的終止
1.Producer
生產者負責把請求加入佇列,如果佇列已滿則等待或者返回錯誤。
public class Producer{
// 訊息佇列
private LinkedList<Request> buffer = null;
// 佇列的容量
private int maxBufferSize = 5;
private NormalLoggerI logger;
public Producer(LinkedList<Request> buffer, int maxBufferSize, NormalLoggerI logger) {
this.buffer = buffer;
this.maxBufferSize = maxBufferSize;
this.logger = logger;
}
public boolean add(Request request) {
try {
// 請求佇列是競爭資源,只有一道程序可以讀寫
synchronized (buffer) {
while (buffer.size() == maxBufferSize) {
try {
logger.debug("buffer is full, Producer thread waiting for consumer to take something from buffer");
// 當請求佇列為滿的狀態時,producer需要等待consumer從佇列中取走請求
buffer.wait();
}
catch (Exception ex) {
logger.exception(ex);
}
}
buffer.add(request);
// 新增新的請求後通知所有的consumer
buffer.notifyAll();
}
return true;
}
catch (Exception ex) {
logger.error("Producer : fail to add request");
logger.exception(ex);
return false;
}
}
}
這裡需要注意的是buffer作為多個執行緒共享的物件必須進行同步保護。否則buffer中請求的數量可能超過maxBufferSize。原因很簡單,如果我們設定maxBufferSize = 5,此時buffer為空,此時此刻有六個producer通過訪問buffer,那麼他們讀取buffer.size()的時候值都為0,於是6個producer同時執行add方法,最終buffer中請求的數量超過了5。
2. Consumer
消費者負責中佇列中取走請求,如果佇列為空消費者進入等待狀態,直到新的請求到達。
public class Consumer extends Thread {
// 訊息佇列
private LinkedList<Request> buffer = null;
private NormalLoggerI logger;
public Consumer(LinkedList<Request> buffer, NormalLoggerI logger) {
this.buffer = buffer;
this.logger = logger;
}
@Override
public void run() {
while (true) {
try {
Request request = null;
synchronized (buffer) {
while (buffer.isEmpty()) {
logger.debug("Queue is empty, Consumer thread is waiting for producer thread to put something in queue");
try {
// 當訊息佇列為空時,消費者等待
buffer.wait();
// 如果client執行緒通過呼叫消費者執行緒的中斷方法,並且測試佇列為空,則消費者執行緒退出
if (buffer.size() == 0 && Thread.currentThread().isInterrupted()) {
logger.debug("Consumer thread is exit in the waiting");
return;
}
}
catch (InterruptedException ex) {
// 如果消費者執行緒在等待的過程中接收到來自主執行緒的中斷請求,則消費者執行緒退出
logger.debug("Consumer thread is exit beacouse of InterruptedException");
return;
}
catch (Exception ex) {
logger.exception(ex);
}
}
// 從佇列中獲取一個請求
request = buffer.pop();
// 通知生產者程序:可以繼續新增訊息
buffer.notifyAll();
}
// Do Something
// 在處理完訊息後如果消費者執行緒檢測到中斷狀態,並且此時佇列為空,則消費者執行緒立即退出
synchronized (buffer) {
if (buffer.size() == 0 && Thread.currentThread().isInterrupted()) {
logger.debug("Consumer thread is exit beacouse of Interrupte status");
return;
}
}
}
catch (Exception ex) {
logger.exception(ex);
}
}
}
}
3. Service
Service的作用是管理生產者、消費者、佇列、以及服務的啟動和停止。
需要注意的是生產者在新增訊息之前必須檢查服務是否被停止,如果停止則返回錯誤。
public class Service{
private static Producer producer = null;
private static List<Consumer> consumers = new ArrayList<Consumer>();
private static LinkedList<Request> buffer = new LinkedList<Request>();
// isServiceStopped對於Service來說是全域性共享的所以標註為static
private static boolean isServiceStopped;
public static void init() {
producer = new Producer(buffer);
consumers.add(new Consumer(buffer)) ;
consumers.add(new Consumer(buffer)) ;
isServiceStopped = false;
// 啟動所有的consumer
for (Consumer c : consumers) {
c.start();
}
}
public static boolean addRequest(String type, String text, String mobile) {
try {
// addReuest方法可能在多個client程序中被訪問,所以必須使用同步機制,但是這會導致效能問題
synchronized(isServiceStopped) {
if (isServiceStopped) {
return false;
}
// 新增訊息
Request request = new Request();
return producer.add(request);
}
}
catch (Exception ex) {
logger.exception(ex);
return false;
}
}
public static void stop() {
synchronized(isServiceStopped) {
// 傳送中斷請求
for (Consumer c : consumers) {
c.interrupt();
}
isServiceStopped = true;
}
}
}
4. 不足
4.1 Consumer的異常處理
在這個Producer-Consumer模型的實現版本中如果consumer在處理請求的過程中發生異常,我只做了記錄日誌的處理。如果能實現retry就更好了。
4.2 Service的新能瓶頸
你是否注意到在Service的視線中addRequest()和stop()方法都需要鎖住isServiceStopped,而isServiceStopped是一個靜態變數。也就是說在同一時刻所有的client執行緒中只有一道執行緒可以訪問addRequest或者stop,這將導致client執行緒的等待,特別是在大併發的時候。
解決方法也很簡單,可以使用volatile特性,這是一個輕量級的鎖。