1. 程式人生 > 其它 >生產者-消費者模式實現(ReentarntLock)

生產者-消費者模式實現(ReentarntLock)

生產者-消費者模式實現(ReentarntLock)

1. 介紹

在JAVA多執行緒程式設計中,生產者-消費者模式有多種實現方式,例如:

  • synchronized+wait/notify
  • 阻塞佇列BolckingQueue
  • ReentrantLock+兩個Condition條件

在這篇隨筆中主要考慮利用ReentarntLockCondition實現, 重點是利用中斷實現了生產者和消費者執行緒的執行終止

2. 程式碼實現

類檔案簡介:

  • Container 提供了向共享佇列入隊和出隊的方法
  • Consumer 封裝了消費者任務
  • Producer 封裝生產者任務
  • MainTest 執行測試

Container類實現:

public class Container<T> {
    private final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    private final Queue<T> sharedQueue;
    private final int capacity;

    public Container( Queue<T> sharedQueue, int capacity) {
        if(sharedQueue == null){
            throw new NullPointerException("sharedQueue is null");
        }
        if (capacity < 1){
            throw new IllegalArgumentException("capacity less one");
        }
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.notFull = lock.newCondition();
        this.sharedQueue = sharedQueue;
        this.capacity = capacity;
    }

    /**
     * 生產者生產操作
     * @param element  生產的元素
     * @throws InterruptedException 如果當前執行緒被中斷
     */
    public void put(T element) throws InterruptedException {
        try {
            lock.lockInterruptibly();
            while (sharedQueue.size() == capacity){
                notFull.await();
            }
            sharedQueue.offer(element);

            System.out.println("生產者執行緒-" + Thread.currentThread().getId()+" 生產: "+element+" 佇列元素數量: " + sharedQueue.size());
            notEmpty.signal();
        }finally {
            try{
                lock.unlock();
            }catch (IllegalMonitorStateException e){}
        }
    }

    /**
     * 消費者操作
     * @return 返回獲得的元素
     * @throws InterruptedException 如果當前執行緒被中斷
     */
    public T take() throws InterruptedException {
        try {
            lock.lockInterruptibly();
            while (sharedQueue.isEmpty()){
                notEmpty.await();
            }
            T element =  sharedQueue.poll();
            System.out.println("消費者執行緒-" + Thread.currentThread().getId()+" 消費 "+ element +" 佇列元素數量: " + sharedQueue.size());
            notFull.signal();
            return element;
        }finally{
            try{
                lock.unlock();
            }
            catch (IllegalMonitorStateException e){}
        }
    }
}

對於Container類有一下幾點說明:

  • 加鎖時使用lock.lockInterruptibly()而不是lock.lock(),使得在競爭鎖期間可以響應中斷。
  • take()put()方法向外丟擲InterruptedException是為了在外部呼叫時跳出死迴圈,從而實現執行緒的退出。InterruptedException異常是由方法內部lockInterruptibly()await()方法丟擲的。
  • finally語句塊中對lock.unlock()捕獲了IllegalMonitorStateException異常。這是因為當執行緒在呼叫lockInterruptibly後競爭鎖失敗,進入自選阻塞狀態,如果在此時被中斷,則在函式返回前執行lock.unlock()
    時沒有獲得鎖就會產生IllegalMonitorStateException異常,所以進行了捕獲。另外,當執行緒呼叫await()方法進入阻塞狀態時,被中斷返回時,是保證一定該執行緒獲得鎖的(),所以不會此種情況導致lock.unlock()丟擲IllegalMonitorStateException異常。
  • await()方法返回執行緒一定獲得該條件所屬的鎖(OpenJDK8 Condition#await() 部分註釋)

    In all cases, before this method can return the current thread must re-acquire the lock associated with this condition. When the thread returns it is guaranteed to hold this lock.

ConsumerProducer類實現

兩個類實現比較類似就一起介紹

  • Consumer類:
public class Consumer implements Runnable{
    private final Container<Integer> container;
    public Consumer(Container<Integer> container) {
        this.container = container;
    }
    @Override
    public void run() {
        try{
            while (true){
                // 模擬消費耗時
                // Thread.sleep(new Random().nextInt(100)+50);
                Integer element = container.take();
                // doSomething
            }
        }// InterruptedException 異常丟擲有兩處, Thread.sleep()方法、container.take()方法
        catch (InterruptedException e){
            System.out.println("消費者執行緒-" + Thread.currentThread().getId()+" 執行完畢");
        }
    }
}

Producer類:

public class Producer implements Runnable{
    private final Container<Integer> container;
    public Producer(Container<Integer> container) {
        this.container = container;
    }
    @Override
    public void run() {
        try{
            while (true){
                Random random = new Random();
                //sleep模擬生產產品的耗時
                //  Thread.sleep(new Random().nextInt(100)+50);
                int element = random.nextInt(1000);
                container.put(element);
            }
        }// InterruptedException異常丟擲有兩處: Thread.sleep()方法、container.put()方法
        catch (InterruptedException e){
            System.out.println("生產者執行緒-" + Thread.currentThread().getId()+" 執行完畢");
        }
    }
}

測試類MainTest實現

MainTest程式碼:

public class MainTest {
    public static void main(String[] args){
        Container<Integer> container = new Container<>(new ArrayDeque<>(), 5);
        //為了使所有提交任務都能及時執行,使用newCachedThreadPool()方法建立執行緒池
        ExecutorService service = Executors.newCachedThreadPool();
        
        int consumerCount = 5; //消費者執行緒數
        for (int i = 0; i < consumerCount; i++) {
            service.execute(new Consumer(container));
        }

        int producerCount = 4; //生產者執行緒數
        for (int i = 0; i < producerCount; i++) {
            service.execute(new Producer(container));
        }
        //阻塞主執行緒 一段時間
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //關閉執行緒池
        service.shutdownNow();
        boolean isTerminated = false;
        while (!isTerminated){
            try {
                isTerminated = service.awaitTermination(100, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("主執行緒退出");
    }
}

MainTest中需要注意的是關閉執行緒池呼叫的函式是ExecutorService.shutdownNow()而不是ExecutorService.shutdown()。呼叫shutdownNow()後執行緒池進入STOP狀態,此時會中斷所以執行緒(包括正在執行的執行緒),清空執行緒池的工作佇列;而呼叫shutdown()後執行緒池進入SHUTDOWN狀態,在此狀態下只會中斷空閒執行緒,不中斷正在執行的執行緒,所以如果呼叫shutdown()在該程式會一直執行,不會停下來。

3. 執行結果

  • 其中一次執行結果(部分)為:
生產者執行緒-19 生產: 686 佇列元素數量: 2
生產者執行緒-19 生產: 865 佇列元素數量: 3
生產者執行緒-19 生產: 346 佇列元素數量: 4
生產者執行緒-19 生產: 191 佇列元素數量: 5
消費者執行緒-13 消費 899 佇列元素數量: 4
消費者執行緒-13 消費 686 佇列元素數量: 3
消費者執行緒-13 消費 865 佇列元素數量: 2
消費者執行緒-16 消費 346 佇列元素數量: 1
消費者執行緒-16 消費 191 佇列元素數量: 0
生產者執行緒-20 生產: 56 佇列元素數量: 1
生產者執行緒-20 生產: 203 佇列元素數量: 2
生產者執行緒-20 生產: 902 佇列元素數量: 3
.......
.......
.......
生產者執行緒-17 生產: 180 佇列元素數量: 1
生產者執行緒-17 生產: 550 佇列元素數量: 2
生產者執行緒-17 生產: 925 佇列元素數量: 3
消費者執行緒-13 消費 180 佇列元素數量: 2
消費者執行緒-14 消費 550 佇列元素數量: 1
生產者執行緒-17 執行完畢
生產者執行緒-18 生產: 722 佇列元素數量: 2
生產者執行緒-20 生產: 998 佇列元素數量: 3
消費者執行緒-12 消費 925 佇列元素數量: 2
消費者執行緒-14 執行完畢
生產者執行緒-18 執行完畢
消費者執行緒-13 執行完畢
生產者執行緒-20 執行完畢
消費者執行緒-15 消費 722 佇列元素數量: 1
消費者執行緒-12 執行完畢
生產者執行緒-19 生產: 971 佇列元素數量: 2
消費者執行緒-16 消費 998 佇列元素數量: 1
消費者執行緒-15 執行完畢
生產者執行緒-19 執行完畢
消費者執行緒-16 執行完畢
主執行緒退出

4. 補充測試

MainTest測試中使用了執行緒池管理執行緒,並中斷執行緒。如果不使用執行緒池而直接使用new Thread()的方法生產執行緒,則可以按照如下程式碼MainTest2測試:

public class MainTest2 {
    public static void main(String[] args) {
        Container<Integer> container = new Container<>(new ArrayDeque<>(), 5);
        //定義消費者執行緒
        Runnable consumer = new Consumer(container);
        int consumerCount = 5;
        List<Thread> consumerThreads = new LinkedList<>();
        for (int i = 0; i < consumerCount; i++) {
            Thread thread = new Thread(consumer);
            consumerThreads.add(thread);
            thread.start();
        }
        //定義生產者執行緒
        Runnable producer= new Producer(container);
        int producerCount = 4;
        List<Thread> producerThreads = new LinkedList<>();
        for (int i = 0; i < producerCount; i++) {
            Thread thread = new Thread(producer);
            producerThreads.add(thread);
            thread.start();
        }
        //阻塞主執行緒 一段時間
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //通過主執行緒依次中斷生產者和消費者執行緒
        for (Thread consumerThread : consumerThreads) {
            consumerThread.interrupt();
        }
        for (Thread producerThread : producerThreads) {
            producerThread.interrupt();
        }
        //等待執行緒全部執行結束後主執行緒才退出
        boolean isRunning = true;
        while (isRunning){
            isRunning = false;
            for (Thread consumerThread : consumerThreads) {
                isRunning = isRunning || consumerThread.isAlive();
            }
            for (Thread producerThread : producerThreads) {
                isRunning = isRunning || producerThread.isAlive();
            }
        }
        System.out.println("主執行緒退出");
    }
}

5. 參考:

  1. 經典併發同步模式:生產者-消費者設計模式

本文主要目的是記錄學習過程,加深對知識點理解; 如有行文有誤,望指正。