Java使用阻塞佇列BlockingQueue實現生產者消費者
阿新 • • 發佈:2019-01-26
什麼是阻塞佇列
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞的插入和移除方法。
- 1、支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿。
- 2、支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。
阻塞佇列常用於生產者和消費者的場景,生產者是向佇列裡新增元素的執行緒,消費者是從佇列裡取元素的執行緒。
阻塞佇列就是生產者用來存放元素、消費者用來獲取元素的容器。
Java中提供了幾個對BlockingQueue的實現類,如: ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 等
在處理生產者/消費者問題上 我們將會使用ArrayBlockingQueue來實現,如下是我們需知道的重要方法:
- put(E e): 這個方法用於向佇列中插入元素,如果佇列已滿,需要等待可用的這間。
- E take(): 這個方法用於從佇列頭部獲取或者移除元素,如果佇列為空則需要等待可用的元素。
使用BlockingQueue來解決生產者/消費者 示例
Mantou類
Producer產生的普通Java物件,並新增到佇列中。
/**
* Producer產生的饅頭類
* @author itmyhome
*
*/
public class Mantou {
private String mantou;
public Mantou(String mantou) {
this.mantou = mantou;
}
public String getMantou() {
return mantou;
}
public void setMantou(String mantou) {
this.mantou = mantou;
}
}
Producer生產者類
Producer這個類會產生訊息並將其放入佇列中。
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
BlockingQueue<Mantou> queue;
public Producer(BlockingQueue<Mantou> queue) {
this.queue = queue;
}
@Override
public void run() {
// 生產饅頭
for (int i = 0; i < 100; i++) {
Mantou mt = new Mantou("" + i);
try {
Thread.sleep(100);
queue.put(mt);
System.out.println("生產饅頭: " + mt.getMantou());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 新增退出訊息
Mantou msg = new Mantou("exit");
try {
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Consumer消費者類
Consumer類會從佇列獲取訊息進行處理。如果獲取的是退出訊息則結束。
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
BlockingQueue<Mantou> queue;
public Consumer(BlockingQueue<Mantou> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Mantou mantou;
// 獲取並處理訊息直到接收到“exit”訊息
while (!(mantou = queue.take()).getMantou().equals("exit")) {
Thread.sleep(100);
System.out.println("消費饅頭: " + mantou.getMantou());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ProducerConsumerService
生產者/消費者的服務類將會產生固定大小的BlockingQueue,生產者和消費者同時共享該BlockingQueue,該服務類會起啟動生產者和消費者執行緒。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author itmyhome
*
*/
public class ProducerConsumerService {
public static void main(String[] args) {
// 建立大小為10的 BlockingQueue
BlockingQueue<Mantou> queue = new ArrayBlockingQueue<Mantou>(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
// 開啟 producer執行緒向佇列中生產訊息
new Thread(producer).start();
// 開啟 consumer執行緒 中佇列中消費訊息
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}
}
程式執行結果:
Producer and Consumer has been started
生產饅頭: 0
生產饅頭: 1
消費饅頭: 0
消費饅頭: 1
生產饅頭: 2
消費饅頭: 2
生產饅頭: 3
消費饅頭: 3
生產饅頭: 4
消費饅頭: 4
生產饅頭: 5
消費饅頭: 5
生產饅頭: 6
消費饅頭: 6
......