1. 程式人生 > >Java使用阻塞佇列BlockingQueue實現生產者消費者

Java使用阻塞佇列BlockingQueue實現生產者消費者

什麼是阻塞佇列

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞的插入和移除方法。

  • 1、支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿。
  • 2、支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。

阻塞佇列常用於生產者和消費者的場景,生產者是向佇列裡新增元素的執行緒,消費者是從佇列裡取元素的執行緒。
阻塞佇列就是生產者用來存放元素、消費者用來獲取元素的容器。

Java中提供了幾個對BlockingQueue的實現類,如: ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 等

在處理生產者/消費者問題上 我們將會使用ArrayBlockingQueue來實現,如下是我們需知道的重要方法:

  • put(E e): 這個方法用於向佇列中插入元素,如果佇列已滿,需要等待可用的這間。
  • E take(): 這個方法用於從佇列頭部獲取或者移除元素,如果佇列為空則需要等待可用的元素。

使用BlockingQueue來解決生產者/消費者 示例

Mantou類

Producer產生的普通Java物件,並新增到佇列中。

/**
 * Producer產生的饅頭類
 * @author itmyhome
 * 
 */
public class Mantou {
    private
String mantou; public Mantou(String mantou) { this.mantou = mantou; } public String getMantou() { return mantou; } public void setMantou(String mantou) { this.mantou = mantou; } }

Producer生產者類

Producer這個類會產生訊息並將其放入佇列中。

import java.util.concurrent.BlockingQueue;

public
class Producer implements Runnable { BlockingQueue<Mantou> queue; public Producer(BlockingQueue<Mantou> queue) { this.queue = queue; } @Override public void run() { // 生產饅頭 for (int i = 0; i < 100; i++) { Mantou mt = new Mantou("" + i); try { Thread.sleep(100); queue.put(mt); System.out.println("生產饅頭: " + mt.getMantou()); } catch (InterruptedException e) { e.printStackTrace(); } } // 新增退出訊息 Mantou msg = new Mantou("exit"); try { queue.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } } }

Consumer消費者類

Consumer類會從佇列獲取訊息進行處理。如果獲取的是退出訊息則結束。

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    BlockingQueue<Mantou> queue;

    public Consumer(BlockingQueue<Mantou> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Mantou mantou;
            // 獲取並處理訊息直到接收到“exit”訊息
            while (!(mantou = queue.take()).getMantou().equals("exit")) {
                Thread.sleep(100);
                System.out.println("消費饅頭: " + mantou.getMantou());
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

ProducerConsumerService

生產者/消費者的服務類將會產生固定大小的BlockingQueue,生產者和消費者同時共享該BlockingQueue,該服務類會起啟動生產者和消費者執行緒。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @author itmyhome
 *
 */
public class ProducerConsumerService {

    public static void main(String[] args) {
        // 建立大小為10的 BlockingQueue
        BlockingQueue<Mantou> queue = new ArrayBlockingQueue<Mantou>(10);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        // 開啟 producer執行緒向佇列中生產訊息
        new Thread(producer).start();
        // 開啟 consumer執行緒 中佇列中消費訊息
        new Thread(consumer).start();
        System.out.println("Producer and Consumer has been started");

    }
}

程式執行結果:

Producer and Consumer has been started
生產饅頭: 0
生產饅頭: 1
消費饅頭: 0
消費饅頭: 1
生產饅頭: 2
消費饅頭: 2
生產饅頭: 3
消費饅頭: 3
生產饅頭: 4
消費饅頭: 4
生產饅頭: 5
消費饅頭: 5
生產饅頭: 6
消費饅頭: 6
......

參考