BoundedBuffer實現生產者、消費者模式
阿新 • • 發佈:2020-08-27
BoundedBuffer類來自於Condition介面的註釋程式碼段!
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition();final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0;++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0;--count; notFull.signal(); return x; } finally { lock.unlock(); } } }
測試程式碼:
import java.util.HashSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class Main { private static int Init = 0; private static HashSet<Integer> hashSet = new HashSet(); private static volatile boolean Finish = true; public static void main(String[] args) throws InterruptedException { BoundedBuffer boundedBuffer = new BoundedBuffer(); ExecutorService exec = Executors.newCachedThreadPool(); int CONSUMER_COUNT = 20; int PRODUCER_COUNT = 2; int PRODUCT_THREAD = 100; int SUM_PRODUCT = PRODUCT_THREAD*PRODUCER_COUNT; for (int i = 0; i < PRODUCER_COUNT; i++) { exec.submit(new Runnable() { @Override public void run() { System.out.println("Produce Thread Run!"); for (int i = 0; i < PRODUCT_THREAD; i++) { try { System.out.println("putting.."); boundedBuffer.put(Integer.valueOf(Init++)); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } for (int i = 0; i < CONSUMER_COUNT; i++) { exec.submit(new Runnable() { @Override public void run() { System.out.println("Produce Thread Run!"); for (; !Thread.interrupted(); ) { try { if (hashSet.size() == SUM_PRODUCT) { exec.shutdownNow(); //Finish = false; } Integer val = (Integer) boundedBuffer.take(); hashSet.add(val); System.out.println(val); } catch (InterruptedException e) { //take()發出的中斷訊號被catch後,標誌為將被清楚,要想被for捕捉到,必須重新設定中斷! if (e instanceof InterruptedException){ Thread.currentThread().interrupt(); } } } } }); } exec.shutdown(); //阻塞,等待所有任務執行完畢! for (;!exec.awaitTermination(10, TimeUnit.NANOSECONDS);); System.out.println("hashSet.size():" + hashSet.size()); } }
注意:注意這裡中斷消費者執行緒的方式!