JAVA利用阻塞佇列實現生產者消費者模式
阿新 • • 發佈:2022-03-01
思路
- 需要一個
ServiceData
類代表業務資料 - 需要一個
DataProducer
類代表生產者去生產業務資料 - 需要一個
DataConsumer
類代表消費者去消費/處理業務資料 - 需要一個執行緒池去管理生產者消費者執行緒
業務資料
/**
* 業務資料
*/
public class ServiceData {
// TODO: setter/getter省略
//業務資料屬性
}
生產者程式碼
import java.util.concurrent.*; /** * 生產者 */ public class DataProducer implements Runnable { private BlockingQueue<ServiceData> queue; public DataProducer(BlockingQueue<ServiceData> queue) { this.queue = queue; } @Override public void run() { //模擬生產十萬個數據 for (int i = 0; i < 100000; i++) { try { ServiceData data = new ServiceData(); //todo 生產資料過程 queue.put(data); }catch (Exception e) { e.printStackTrace(); } } } }
消費者程式碼
import java.util.concurrent.*; /** * 消費者 */ public class DataConsumer implements Runnable { private BlockingQueue<ServiceData> queue; public DataConsumer(BlockingQueue<ServiceData> queue) { this.queue = queue; } /** * 當前任務是否完成 */ private boolean finished; public boolean isFinished() { return finished; } /** * 判斷所有消費者都完成任務 * @param consumers * @return */ public static boolean isAllFinished(DataConsumer... consumers) { for (DataConsumer consumer : consumers) { if (!consumer.isFinished()) { return false; } } return true; } @Override public void run() { //不停消費資料,沒有資料時阻塞 while (true) { try { ServiceData data = queue.take(); finished = false; //todo 消費資料過程 }catch (Exception e) { e.printStackTrace(); }finally { finished = true; } } } }
執行緒池
不積跬步無以至千里import java.util.concurrent.*; public class Test { public static void main(String[] args) { //建立一個有三個固定執行緒的執行緒池 ExecutorService executorService = Executors.newFixedThreadPool(3); //建立任務佇列 BlockingQueue<PdfUrl> queue = new LinkedBlockingQueue<>(); //建立一個生產者 DataProducer producer1 = new DataProducer(queue); //建立兩個消費者 DataConsumer consumer1 = new DataConsumer(queue); DataConsumer consumer2 = new DataConsumer(queue); //啟動任務 executorService.execute(producer1); executorService.execute(consumer1); executorService.execute(consumer2); //生產者生產完成後,佇列空了並且消費完成,則結束 while (true) { if (queue.isEmpty() && DataConsumer.isAllFinished(consumer1,consumer2)) { break; } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } //關閉執行緒池,立即stop執行緒 threadPool.shutdownNow(); } }