1. 程式人生 > >BlockingQueue的實現、使用 以及 生產者消費者的BlockingQueue實現

BlockingQueue的實現、使用 以及 生產者消費者的BlockingQueue實現

BlockingQueue

  • BlockingQueue介面主要實現:

這裡寫圖片描述

  • 主要有ArrayBlockingQueue(基於陣列,有界佇列)LinkedBlockingQueue(無界佇列)
  • 關鍵是在Blocking上,它會讓服務執行緒在佇列為空時,進行等待,當有新的訊息進入佇列後,自動將執行緒喚醒。

BlockingQueue的實現與使用

  • ArrayBlockingQueue的內部元素都放在一個物件陣列中,final Object[] items;
  • 壓入元素:offer()-如果佇列滿了,返回false; put()-如果佇列滿了,它會一致等待,直到有空閒的位置。
  • 彈出元素:poll()-如果佇列為空直接返回null,take()-會等到佇列中有可用元素。
  • 使用 put() , take()
  • 底層實現: 通過以下欄位實現put(), take()
    // 1.欄位
    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
  • put(), taker()
    public
void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

生產者消費者的BlockingQueue實現

  • Producer:
public class Producer implements Runnable{
    BlockingQueue<String> queue;

    public Producer(BlockingQueue<String> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try{
            String product_name = "a product-"+Thread.currentThread().getName();
            System.out.println("I have produced:"+Thread.currentThread().getName());
            queue.put(product_name); //如果隊滿,則阻塞
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • Consumer:
public class consumer implements Runnable{
    BlockingQueue<String> blockingQueue;

    public consumer(BlockingQueue<String> blockingQueue){
        this.blockingQueue = blockingQueue;
    }


    @Override
    public void run() {
        try {
            String product_name = blockingQueue.take(); //如果隊空,則阻塞
            System.out.println("消費了:"+product_name);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • test:
public class test_producer {

    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingDeque<>(2);
        //BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
        // LinkedBlockingDeque 預設大小為Integer.MAX_VALUE
        for (int i = 0; i < 5; i++) {
            new Thread(new Producer(queue), "Producer"+i).start();
            new Thread(new consumer(queue), "Consumer"+i).start();
        }
    }
}
  • 結果

I have produced:Producer0
消費了:a product-Producer0
I have produced:Producer1
消費了:a product-Producer1
I have produced:Producer2
消費了:a product-Producer2
I have produced:Producer3
I have produced:Producer4
消費了:a product-Producer3
消費了:a product-Producer4