1. 程式人生 > >LinkedBlockingQueue 實現生產者消費者模型

LinkedBlockingQueue 實現生產者消費者模型

/**
 * 市場演示倉庫
 *
 * @author wei.Li by 14-8-21.
 */
public class MarketStorage {

    //生產者執行緒池
    protected static final ExecutorService EXECUTOR_SERVICE_PRODUCER
            = Executors.newFixedThreadPool(10);
    //啟動生產者執行緒數量
    protected static final int PRODUCER_THREAD_NUM = 2;
    //生產者執行緒睡眠隨機最大時間
    protected static final int PRODUCER_THREAD_SLEEP = 200;
    //生產者生成物件次數
    protected static AtomicInteger getProducerObj_Count = new AtomicInteger(0);
    //是否生產
    protected static boolean isRun_Producer = true;


    //消費者執行緒池
    protected static final ExecutorService EXECUTOR_SERVICE_CONSUMER
            = Executors.newFixedThreadPool(10);
    //啟動消費者執行緒數量
    protected static final int CONSUMER_THREAD_NUM = 20;
    //消費者執行緒睡眠隨機最大時間
    protected static final int CONSUMER_THREAD_SLEEP = 1000;
    //消費者消費物件次數
    protected static AtomicInteger getConsumerObj_Count = new AtomicInteger(0);
    //是否消費
    protected static boolean isRun_Cousumer = true;

    //市場倉庫-儲存資料的佇列 預設倉庫容量大小100
    /**
     * @see com.java.queue.LinkedBlockingQueue_#linkedBlockingQueue2Void()
     */
    protected static LinkedBlockingQueue<CommodityObj> blockingQueue
            = new LinkedBlockingQueue<CommodityObj>(100);

    /**
     * 生成生產者執行緒
     */
    private static void runProducer() {
        for (int i = 0; i < PRODUCER_THREAD_NUM; i++) {
            EXECUTOR_SERVICE_PRODUCER.submit(new Producer());
        }
    }

    /**
     * 生成消費者執行緒生成
     */
    private static void runConsumer() {
        for (int i = 0; i < CONSUMER_THREAD_NUM; i++) {
            Thread thread = new Thread(new Consumer());
            EXECUTOR_SERVICE_CONSUMER.submit(thread);
        }
    }

    /**
     * 停止執行緒生產與消費
     * 關閉執行緒池
     */
    private static void shumdown() {
        if (!EXECUTOR_SERVICE_PRODUCER.isShutdown()) {
            isRun_Producer = false;
            EXECUTOR_SERVICE_PRODUCER.shutdown();
        }
        if (!EXECUTOR_SERVICE_CONSUMER.isShutdown()) {
            isRun_Cousumer = false;
            EXECUTOR_SERVICE_CONSUMER.shutdown();
        }
    }


    public static void main(String[] args) {
        runConsumer();
        runProducer();

        /**
         * 1 min 後停止執行
         */
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                shumdown();
                System.out.println("~~~~~~~~~~~~ shumdown done ~~~~~~~~~~~~~~");
            }
        }, 1000 * 60L);
    }
}