1. 程式人生 > >阻塞佇列及 wait notify模擬實現 BlockingQueue

阻塞佇列及 wait notify模擬實現 BlockingQueue

一、阻塞佇列

阻塞佇列與普通佇列的區別在於,當佇列是空的時,從佇列中獲取元素的操作將會被阻塞,或者當佇列是滿時,往佇列裡新增元素的操作會被阻塞。試圖從空的阻塞佇列中獲取元素的執行緒將會被阻塞,直到其他的執行緒往空的佇列插入新的元素。

二、wait notify實現阻塞佇列

public class BlockingQueueDemo {
    //定義兩把鎖,只是簡單的鎖
    private Object full = new Object();
    private Object empty = new Object();

    private int[] array;
    private int head;
    private int last; 
    private int size;
    public BlockingQueueDemo(int maxSize) {
        this.head = 0; 
        this.last = 0;
        array = new int[maxSize];
    }
    public void put(int e) throws InterruptedException {
        synchronized

(full){
            while (size == array.length) {//沒有更多空間,需要阻塞
                full.wait();
            }
        }
        
        if (last < array.length) {
            array[last] = e;
            last++;
        } else {
            array[0] = e;
            last = 1;
        }
        size++;
        System.out.println(size+" :size大小,last: "+last+"    :e:   "+e);
        
        //放入資料以後,就可以喚醒obj2物件的鎖
        synchronized(empty){
            empty.notify();//達到了喚醒poll方法的條件
        }

    }
    
    public int pool() throws InterruptedException {
        synchronized(empty){
            while (size == 0) {//沒有資料,阻塞
                empty.wait();
            }
        }
        int returnValue = 0;
        //佇列中有資料,且head小於array的長度
        returnValue = array[head];
        array[head] = -1;
        System.out.println(returnValue + "              彈出的"+"head:"+ head);
        if (head < array.length) {//彈出head下標的資料
            head++;
        } else {
            head = 0;
        }
        size--; 
        //拿走資料後,喚醒full物件鎖
        synchronized(full){
            full.notify();
        }
        return returnValue;
    }
    
    public String toString(){
        for (int i = 0; i < array.length; i++) {
            System.out.println(array[i]);
        }
        return "";
    }
    
    public static void main(String[] args) throws InterruptedException {
        BlockingQueueDemo bq = new BlockingQueueDemo(5);
        bq.put(10);
        bq.put(20);
        bq.put(30);
        bq.put(40);
        bq.put(50);
        System.out.println();
        bq.pool();
        bq.pool();
        bq.pool();
        bq.pool();
        System.out.println();
        bq.toString();
        System.out.println();
        bq.put(100);
        bq.put(200);
        bq.put(300);
        bq.put(400);
        System.out.println();
        bq.toString();
    }

}

三、BlockingQueue

public class BlockingQueueDemo {
    public static class Producer implements Runnable{
        private final BlockingQueue<Integer> blockingQueue;
        private volatile boolean flag;
        private Random random;


        public Producer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
            flag=false;
            random=new Random();


        }
        public void run() {
            while(!flag){
                int info=random.nextInt(100);
                try {
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName()+" 生成了產品 "+info);
                    blockingQueue.put(info);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }               
            }
        }
        public void shutDown(){
            flag=true;
        }
    }
    //消費者
    public static class Consumer implements Runnable{
        private final BlockingQueue<Integer> blockingQueue;
        private volatile boolean flag;
        public Consumer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
        public void run() {
            while(!flag){
                int info;
                try {
                    System.out.println(Thread.currentThread().getName()+"   需要獲得一個產品");
                    info = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()+" 消費了一個產品 "+info);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }               
            }
        }
        public void shutDown(){
            flag=true;
        }
    }
    public static void main(String[] args) throws InterruptedException{
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10);
        Producer producer1=new Producer(blockingQueue);
        Consumer consumer1=new Consumer(blockingQueue);
        Consumer consumer2=new Consumer(blockingQueue);
        Consumer consumer3=new Consumer(blockingQueue);
        //執行緒池
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(consumer1);
        service.execute(consumer2);
        service.execute(consumer3);
        service.execute(producer1);

        //執行10S結束
        Thread.sleep(10*1000);
        consumer1.shutDown();
        consumer2.shutDown();
        consumer3.shutDown();
        producer1.shutDown();
        service.shutdown();
    }
}