1. 程式人生 > >Producer-Consumer模型:二、如何實現

Producer-Consumer模型:二、如何實現

Producer-Consumer模型的實現主要考慮一下幾個方便:

  • 生成者的實現
  • 消費者的實現
  • 訊息佇列
  • 執行緒的通訊、等待、同步
  • 執行緒的終止

1.Producer

生產者負責把請求加入佇列,如果佇列已滿則等待或者返回錯誤。

public class Producer{
    // 訊息佇列
    private LinkedList<Request> buffer = null;
    // 佇列的容量
    private int maxBufferSize = 5;
    private NormalLoggerI logger;

    public
Producer(LinkedList<Request> buffer, int maxBufferSize, NormalLoggerI logger) { this.buffer = buffer; this.maxBufferSize = maxBufferSize; this.logger = logger; } public boolean add(Request request) { try { // 請求佇列是競爭資源,只有一道程序可以讀寫 synchronized
(buffer) { while (buffer.size() == maxBufferSize) { try { logger.debug("buffer is full, Producer thread waiting for consumer to take something from buffer"); // 當請求佇列為滿的狀態時,producer需要等待consumer從佇列中取走請求 buffer.wait(); } catch
(Exception ex) { logger.exception(ex); } } buffer.add(request); // 新增新的請求後通知所有的consumer buffer.notifyAll(); } return true; } catch (Exception ex) { logger.error("Producer : fail to add request"); logger.exception(ex); return false; } } }

這裡需要注意的是buffer作為多個執行緒共享的物件必須進行同步保護。否則buffer中請求的數量可能超過maxBufferSize。原因很簡單,如果我們設定maxBufferSize = 5,此時buffer為空,此時此刻有六個producer通過訪問buffer,那麼他們讀取buffer.size()的時候值都為0,於是6個producer同時執行add方法,最終buffer中請求的數量超過了5。

2. Consumer

消費者負責中佇列中取走請求,如果佇列為空消費者進入等待狀態,直到新的請求到達。

public class Consumer extends Thread {
    // 訊息佇列
    private LinkedList<Request> buffer = null;
    private NormalLoggerI logger;

    public Consumer(LinkedList<Request> buffer, NormalLoggerI logger) {
        this.buffer = buffer;
        this.logger = logger;
    }

    @Override 
    public void run() {
        while (true) {
            try {
                Request request = null;
                synchronized (buffer) {
                    while (buffer.isEmpty()) {
                        logger.debug("Queue is empty, Consumer thread is waiting for producer thread to put something in queue");
                        try {
                            // 當訊息佇列為空時,消費者等待
                            buffer.wait();
                            // 如果client執行緒通過呼叫消費者執行緒的中斷方法,並且測試佇列為空,則消費者執行緒退出
                            if (buffer.size() == 0 && Thread.currentThread().isInterrupted()) {
                                logger.debug("Consumer thread is exit in the waiting");
                                return;
                            }
                        }
                        catch (InterruptedException ex) {
                            // 如果消費者執行緒在等待的過程中接收到來自主執行緒的中斷請求,則消費者執行緒退出
                            logger.debug("Consumer thread is exit beacouse of InterruptedException");
                            return;
                        }
                        catch (Exception ex) {
                            logger.exception(ex);
                        }
                    }
                    // 從佇列中獲取一個請求
                    request = buffer.pop();
                    // 通知生產者程序:可以繼續新增訊息
                    buffer.notifyAll();
                }

                // Do Something

                // 在處理完訊息後如果消費者執行緒檢測到中斷狀態,並且此時佇列為空,則消費者執行緒立即退出
                synchronized (buffer) {
                    if (buffer.size() == 0 && Thread.currentThread().isInterrupted()) {
                        logger.debug("Consumer thread is exit beacouse of Interrupte status");
                        return;
                    }
                }
            }
            catch (Exception ex) {
                logger.exception(ex);
            }
        }

    }
}

3. Service

Service的作用是管理生產者、消費者、佇列、以及服務的啟動和停止。

需要注意的是生產者在新增訊息之前必須檢查服務是否被停止,如果停止則返回錯誤。

public class Service{
    private static Producer producer = null;
    private static List<Consumer> consumers = new ArrayList<Consumer>();
    private static LinkedList<Request> buffer = new LinkedList<Request>();
    // isServiceStopped對於Service來說是全域性共享的所以標註為static
    private static boolean isServiceStopped;

    public static void init() {
        producer = new Producer(buffer);
        consumers.add(new Consumer(buffer)) ;
        consumers.add(new Consumer(buffer)) ;
        isServiceStopped = false;
        // 啟動所有的consumer
        for (Consumer c : consumers) {
            c.start();
        }
    }

    public static boolean addRequest(String type, String text, String mobile) {
        try {
            // addReuest方法可能在多個client程序中被訪問,所以必須使用同步機制,但是這會導致效能問題
            synchronized(isServiceStopped) {
                if (isServiceStopped) {
                    return false;
                }
                // 新增訊息
                Request request = new Request();
                return producer.add(request);
            }
        }
        catch (Exception ex) {
            logger.exception(ex);
            return false;
        }
    }

    public static void stop() {
        synchronized(isServiceStopped) {
            // 傳送中斷請求
            for (Consumer c : consumers) {
                c.interrupt();
            }
            isServiceStopped = true;
        }
    }
}

4. 不足

4.1 Consumer的異常處理

在這個Producer-Consumer模型的實現版本中如果consumer在處理請求的過程中發生異常,我只做了記錄日誌的處理。如果能實現retry就更好了。

4.2 Service的新能瓶頸

你是否注意到在Service的視線中addRequest()和stop()方法都需要鎖住isServiceStopped,而isServiceStopped是一個靜態變數。也就是說在同一時刻所有的client執行緒中只有一道執行緒可以訪問addRequest或者stop,這將導致client執行緒的等待,特別是在大併發的時候。
解決方法也很簡單,可以使用volatile特性,這是一個輕量級的鎖。