阻塞佇列及 wait notify模擬實現 BlockingQueue
一、阻塞佇列
阻塞佇列與普通佇列的區別在於,當佇列是空的時,從佇列中獲取元素的操作將會被阻塞,或者當佇列是滿時,往佇列裡新增元素的操作會被阻塞。試圖從空的阻塞佇列中獲取元素的執行緒將會被阻塞,直到其他的執行緒往空的佇列插入新的元素。
二、wait notify實現阻塞佇列
public class BlockingQueueDemo {
//定義兩把鎖,只是簡單的鎖
private Object full = new Object();
private Object empty = new Object();
private int[] array;
private int head;
private int last;
private int size;
public BlockingQueueDemo(int maxSize) {
this.head = 0;
this.last = 0;
array = new int[maxSize];
}
public void put(int e) throws InterruptedException {
synchronized
while (size == array.length) {//沒有更多空間,需要阻塞
full.wait();
}
}
if (last < array.length) {
array[last] = e;
last++;
} else {
array[0] = e;
last = 1;
}
size++;
System.out.println(size+" :size大小,last: "+last+" :e: "+e);
//放入資料以後,就可以喚醒obj2物件的鎖
synchronized(empty){
empty.notify();//達到了喚醒poll方法的條件
}
}
public int pool() throws InterruptedException {
synchronized(empty){
while (size == 0) {//沒有資料,阻塞
empty.wait();
}
}
int returnValue = 0;
//佇列中有資料,且head小於array的長度
returnValue = array[head];
array[head] = -1;
System.out.println(returnValue + " 彈出的"+"head:"+ head);
if (head < array.length) {//彈出head下標的資料
head++;
} else {
head = 0;
}
size--;
//拿走資料後,喚醒full物件鎖
synchronized(full){
full.notify();
}
return returnValue;
}
public String toString(){
for (int i = 0; i < array.length; i++) {
System.out.println(array[i]);
}
return "";
}
public static void main(String[] args) throws InterruptedException {
BlockingQueueDemo bq = new BlockingQueueDemo(5);
bq.put(10);
bq.put(20);
bq.put(30);
bq.put(40);
bq.put(50);
System.out.println();
bq.pool();
bq.pool();
bq.pool();
bq.pool();
System.out.println();
bq.toString();
System.out.println();
bq.put(100);
bq.put(200);
bq.put(300);
bq.put(400);
System.out.println();
bq.toString();
}
}
三、BlockingQueue
public class BlockingQueueDemo {
public static class Producer implements Runnable{
private final BlockingQueue<Integer> blockingQueue;
private volatile boolean flag;
private Random random;
public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
flag=false;
random=new Random();
}
public void run() {
while(!flag){
int info=random.nextInt(100);
try {
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+" 生成了產品 "+info);
blockingQueue.put(info);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void shutDown(){
flag=true;
}
}
//消費者
public static class Consumer implements Runnable{
private final BlockingQueue<Integer> blockingQueue;
private volatile boolean flag;
public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
while(!flag){
int info;
try {
System.out.println(Thread.currentThread().getName()+" 需要獲得一個產品");
info = blockingQueue.take();
System.out.println(Thread.currentThread().getName()+" 消費了一個產品 "+info);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void shutDown(){
flag=true;
}
}
public static void main(String[] args) throws InterruptedException{
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10);
Producer producer1=new Producer(blockingQueue);
Consumer consumer1=new Consumer(blockingQueue);
Consumer consumer2=new Consumer(blockingQueue);
Consumer consumer3=new Consumer(blockingQueue);
//執行緒池
ExecutorService service = Executors.newCachedThreadPool();
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
service.execute(producer1);
//執行10S結束
Thread.sleep(10*1000);
consumer1.shutDown();
consumer2.shutDown();
consumer3.shutDown();
producer1.shutDown();
service.shutdown();
}
}