1. 程式人生 > 實用技巧 >BoundedBuffer實現生產者、消費者模式

BoundedBuffer實現生產者、消費者模式

BoundedBuffer類來自於Condition介面的註釋程式碼段!

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition();
   final Condition notEmpty = lock.newCondition(); 

   
final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0;
++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0;
--count; notFull.signal(); return x; } finally { lock.unlock(); } } }

測試程式碼:

import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {
    private static int Init = 0;
    private static HashSet<Integer> hashSet = new HashSet();
    private static volatile boolean Finish = true;

    public static void main(String[] args) throws InterruptedException {
        BoundedBuffer boundedBuffer = new BoundedBuffer();
        ExecutorService exec = Executors.newCachedThreadPool();
        int CONSUMER_COUNT = 20;
        int PRODUCER_COUNT = 2;
        int PRODUCT_THREAD = 100;
        int SUM_PRODUCT = PRODUCT_THREAD*PRODUCER_COUNT;
        for (int i = 0; i < PRODUCER_COUNT; i++) {
            exec.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Produce Thread Run!");
                    for (int i = 0; i < PRODUCT_THREAD; i++) {
                        try {
                            System.out.println("putting..");
                            boundedBuffer.put(Integer.valueOf(Init++));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }

        for (int i = 0; i < CONSUMER_COUNT; i++) {
            exec.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Produce Thread Run!");
                    for (; !Thread.interrupted(); ) {
                        try {
                            if (hashSet.size() == SUM_PRODUCT) {
                                exec.shutdownNow();
                                //Finish = false;
                            }
                            Integer val = (Integer) boundedBuffer.take();
                            hashSet.add(val);
                            System.out.println(val);

                        } catch (InterruptedException e) {
                            //take()發出的中斷訊號被catch後,標誌為將被清楚,要想被for捕捉到,必須重新設定中斷!
                            if (e instanceof  InterruptedException){
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                }
            });
        }

        exec.shutdown();
        //阻塞,等待所有任務執行完畢!
        for (;!exec.awaitTermination(10, TimeUnit.NANOSECONDS););
        System.out.println("hashSet.size():" + hashSet.size());
    }

}

注意:注意這裡中斷消費者執行緒的方式!