1. 程式人生 > 其它 >JAVA利用阻塞佇列實現生產者消費者模式

JAVA利用阻塞佇列實現生產者消費者模式

思路

  1. 需要一個ServiceData類代表業務資料
  2. 需要一個DataProducer類代表生產者去生產業務資料
  3. 需要一個DataConsumer類代表消費者去消費/處理業務資料
  4. 需要一個執行緒池去管理生產者消費者執行緒

業務資料

/**
 * 業務資料
 */
public class ServiceData {
    // TODO: setter/getter省略 
    //業務資料屬性
}

生產者程式碼

import java.util.concurrent.*;
/**
 * 生產者
 */
public class DataProducer implements Runnable {

    private BlockingQueue<ServiceData> queue;
    
    public DataProducer(BlockingQueue<ServiceData> queue) {
        this.queue = queue;
    }
    
    @Override
    public void run() {
        //模擬生產十萬個數據
        for (int i = 0; i < 100000; i++) {
            try {
                ServiceData data = new ServiceData();
                //todo 生產資料過程
                queue.put(data);
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

消費者程式碼

import java.util.concurrent.*;
/**
 * 消費者
 */
public class DataConsumer implements Runnable {

    private BlockingQueue<ServiceData> queue;
    
    public DataConsumer(BlockingQueue<ServiceData> queue) {
        this.queue = queue;
    }

    /**
     * 當前任務是否完成
     */
    private boolean finished;
    public boolean isFinished() {
        return finished;
    }

    /**
     * 判斷所有消費者都完成任務
     * @param consumers
     * @return
     */
    public static boolean isAllFinished(DataConsumer... consumers) {
        for (DataConsumer consumer : consumers) {
            if (!consumer.isFinished()) {
                return false;
            }
        }
        return true;
    }
    
    @Override
    public void run() {
        //不停消費資料,沒有資料時阻塞
        while (true) {
            try {
                ServiceData data = queue.take();
                finished = false;
                //todo 消費資料過程
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                finished = true;
            }
        }
    }
}

執行緒池

import java.util.concurrent.*;

public class Test {
    public static void main(String[] args) {
        //建立一個有三個固定執行緒的執行緒池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        //建立任務佇列
        BlockingQueue<PdfUrl> queue = new LinkedBlockingQueue<>();
        //建立一個生產者
        DataProducer producer1 = new DataProducer(queue);
        //建立兩個消費者
        DataConsumer consumer1 = new DataConsumer(queue);
        DataConsumer consumer2 = new DataConsumer(queue);
        //啟動任務
        executorService.execute(producer1);
        executorService.execute(consumer1);
        executorService.execute(consumer2);
        //生產者生產完成後,佇列空了並且消費完成,則結束
        while (true) {
            if (queue.isEmpty() && DataConsumer.isAllFinished(consumer1,consumer2)) {
                break;
            }
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //關閉執行緒池,立即stop執行緒
        threadPool.shutdownNow();
    }
}
不積跬步無以至千里