1. 程式人生 > 實用技巧 >ArrayBlockingQueue實現生產者、消費者模式

ArrayBlockingQueue實現生產者、消費者模式

import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {
   
    private static int Init = 0;
    private static HashSet<Integer> hashSet = new
HashSet(); private static volatile boolean Finish = true; public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(100); ExecutorService exec = Executors.newCachedThreadPool(); int CONSUMER_COUNT = 20;
int PRODUCER_COUNT = 2; int PRODUCT_THREAD = 100; int SUM_PRODUCT = PRODUCT_THREAD*PRODUCER_COUNT; for (int i = 0; i < CONSUMER_COUNT; i++) { exec.submit(new Runnable() { @Override public void run() { System.out.println(
"Produce Thread Run!"); for (; !Thread.interrupted(); ) { try { if (hashSet.size() == SUM_PRODUCT) { exec.shutdownNow(); //Finish = false; } Integer val = (Integer) abq.take(); hashSet.add(val); System.out.println(val); } catch (InterruptedException e) { //take()發出的中斷訊號被catch後,標誌為將被清楚,要想被for捕捉到,必須重新設定中斷! if (e instanceof InterruptedException){ Thread.currentThread().interrupt(); } } } } }); } for (int i = 0; i < PRODUCER_COUNT; i++) { exec.submit(new Runnable() { @Override public void run() { System.out.println("Produce Thread Run!"); for (int i = 0; i < PRODUCT_THREAD; i++) { try { System.out.println("putting.."); abq.put(Integer.valueOf(Init++)); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } //exec.shutdown(); //阻塞,等待所有任務執行完畢! for (;!exec.awaitTermination(10, TimeUnit.NANOSECONDS);); System.out.println("hashSet.size():" + hashSet.size()); } }

注意:這裡報了一個不應該出現的異常!

如果先submits生產者,並且在消費者執行緒過多的情況下(比如200個),則會報java.util.concurrent.RejectedExecutionException異常!