BlockingQueue的實現、使用 以及 生產者消費者的BlockingQueue實現
阿新 • • 發佈:2019-02-05
BlockingQueue
- BlockingQueue介面主要實現:
- 主要有ArrayBlockingQueue(基於陣列,有界佇列)、LinkedBlockingQueue(無界佇列)。
- 關鍵是在Blocking上,它會讓服務執行緒在佇列為空時,進行等待,當有新的訊息進入佇列後,自動將執行緒喚醒。
BlockingQueue的實現與使用
- ArrayBlockingQueue的內部元素都放在一個物件陣列中,
final Object[] items;
- 壓入元素:offer()-如果佇列滿了,返回false; put()-如果佇列滿了,它會一致等待,直到有空閒的位置。
- 彈出元素:poll()-如果佇列為空直接返回null,take()-會等到佇列中有可用元素。
- 使用 put() , take()
- 底層實現: 通過以下欄位實現put(), take()
// 1.欄位
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
- put(), taker()
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
生產者消費者的BlockingQueue實現
- Producer:
public class Producer implements Runnable{
BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
try{
String product_name = "a product-"+Thread.currentThread().getName();
System.out.println("I have produced:"+Thread.currentThread().getName());
queue.put(product_name); //如果隊滿,則阻塞
}catch (Exception e){
e.printStackTrace();
}
}
}
- Consumer:
public class consumer implements Runnable{
BlockingQueue<String> blockingQueue;
public consumer(BlockingQueue<String> blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
String product_name = blockingQueue.take(); //如果隊空,則阻塞
System.out.println("消費了:"+product_name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- test:
public class test_producer {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingDeque<>(2);
//BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
// LinkedBlockingDeque 預設大小為Integer.MAX_VALUE
for (int i = 0; i < 5; i++) {
new Thread(new Producer(queue), "Producer"+i).start();
new Thread(new consumer(queue), "Consumer"+i).start();
}
}
}
- 結果
I have produced:Producer0
消費了:a product-Producer0
I have produced:Producer1
消費了:a product-Producer1
I have produced:Producer2
消費了:a product-Producer2
I have produced:Producer3
I have produced:Producer4
消費了:a product-Producer3
消費了:a product-Producer4