使用LinkedBlockingQueue來實現生產者消費者的例子
阿新 • • 發佈:2018-12-14
工作中,經常有將檔案中的資料匯入資料庫的表中,或者將資料庫表中的記錄儲存到檔案中。為了提高程式的處理速度,可以設定讀執行緒和寫執行緒,這些執行緒通過訊息佇列進行資料互動。本例就是使用了LinkedBlockingQueue來模仿生產者執行緒和消費者執行緒進行資料生產和消費。
為了方便,這些不同的類被寫在了一個類中,實際使用的時候,可以單獨拆開,舉一反三地使用。
以下是例子:
LinkedBlockingQueueDemo.java
import java.util.Date; import java.util.Random; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class LinkedBlockingQueueDemo { // 生產者執行緒數量 private final static int providerThreadAmount = 5; // 記錄每一個生產者執行緒是否處理完畢的標記 private static boolean[] providerDoneFlag = new boolean[providerThreadAmount]; // 整個所有的生產者執行緒全部結束的標記 private static boolean done = false; // 一個執行緒安全的佇列,用於生產者和消費者非同步地資訊互動 private static LinkedBlockingQueue<String> linkedBlockingQeque = new LinkedBlockingQueue<String>(); static class ProviderThread extends Thread { private Thread thread; private String threadName; private int threadNo; public ProviderThread(String threadName2, int threadNo) { this.threadName = threadName2; this.threadNo = threadNo; } public void start() { if (thread == null) { thread = new Thread(this, threadName); } thread.start(); System.out.println( (new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName()); } @Override public void run() { int rows = 0; for (int i = 0; i < 100; i++) { String string = String.format("%s-%d-%s", threadName, i, Thread.currentThread().getName()); // offer不會去阻塞執行緒,put會 //linkedBlockingQeque.offer(string); linkedBlockingQeque.put(string); rows++; /* * try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch * (InterruptedException e) { e.printStackTrace(); } */ } // 本執行緒處理完畢的標記 LinkedBlockingQueueDemo.providerDoneFlag[threadNo] = true; System.out.println((new Date().getTime()) + " " + threadName + " end. total rows is " + rows + "\t" + Thread.currentThread().getName()); } } static class ConsumerThread implements Runnable { private Thread thread; private String threadName; public ConsumerThread(String threadName2) { this.threadName = threadName2; } public void start() { if (thread == null) { thread = new Thread(this, threadName); } thread.start(); System.out.println( (new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName()); } @Override public void run() { int rows = 0; // 生產者執行緒沒有結束,或者訊息佇列中有元素的時候,去佇列中取資料 while (LinkedBlockingQueueDemo.getDone() == false || linkedBlockingQeque.isEmpty() == false) { try { //在甘肅電信的實際應用中發現,當資料的處理量達到千萬級的時候,帶引數的poll會將主機的幾百個G的記憶體耗盡,jvm會提示申請記憶體失敗,並將程序退出。網上說,這是這個方法的一個bug。 //String string = linkedBlockingQeque.poll(3, TimeUnit.SECONDS); String string = linkedBlockingQeque.poll(); if (string == null) { continue; } rows++; System.out .println((new Date().getTime()) + " " + threadName + " get msg from linkedBlockingQeque is " + string + "\t" + Thread.currentThread().getName()); /* * try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch * (InterruptedException e) { e.printStackTrace(); } */ } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println((new Date().getTime()) + " " + threadName + " end total rows is " + rows + "\t" + Thread.currentThread().getName()); } } public static synchronized void setDone(boolean flag) { LinkedBlockingQueueDemo.done = flag; } public static synchronized boolean getDone() { return LinkedBlockingQueueDemo.done; } public static void main(String[] args) { System.out.println((new Date().getTime()) + " " + "process begin at " + Thread.currentThread().getName()); System.out.println( (new Date().getTime()) + " " + "linkedBlockingDeque.hashCode() is " + linkedBlockingQeque.hashCode()); // 啟動若干生產者執行緒 for (int i = 0; i < providerThreadAmount; i++) { String threadName = String.format("%s-%d", "ProviderThread", i); ProviderThread providerThread = new ProviderThread(threadName, i); providerThread.start(); } // 啟動若干個消費者執行緒 for (int i = 0; i < 10; i++) { String threadName = String.format("%s-%d", "ConsumerThread", i); ConsumerThread consumerThread = new ConsumerThread(threadName); consumerThread.start(); } // 迴圈檢測生產者執行緒是否處理完畢 do { for (boolean b : providerDoneFlag) { if (b == false) { /* * try { Thread.sleep(3 * 1000); System.out.println((new Date().getTime()) + * " "+"sleep 3 seconds. linkedBlockingQeque.size() is "+linkedBlockingQeque. * size() + "\t" + Thread.currentThread().getName()); } catch * (InterruptedException e) { e.printStackTrace(); } */ // 只要有一個生產者執行緒沒有結束,則整個生產者執行緒檢測認為沒有結束 break; } LinkedBlockingQueueDemo.setDone(true); } // 生產者執行緒全部結束的時候,跳出檢測 if (LinkedBlockingQueueDemo.getDone() == true) { break; } } while (true); System.out.println((new Date().getTime()) + " process done successfully\t" + Thread.currentThread().getName()); } }
結果略。