1. 程式人生 > 實用技巧 >生產者消費者模式

生產者消費者模式

生產者消費者問題是執行緒模型中的經典問題:生產者和消費者在同一時間段內共用同一儲存空間,生產者向空間裡生產資料,而消費者取走資料。

阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。這個阻塞佇列就是用來給生產者和消費者解耦的。

wait/notify方法

首先,我們搞清楚Thread.sleep()方法和Object.wait()、Object.notify()方法的區別。

  1. sleep()是Thread類的方法;而wait()notify()notifyAll()是Object類中定義的方法;儘管這兩個方法都會影響執行緒的執行行為,但是本質上是有區別的。

  2. Thread.sleep()

    不會導致鎖行為的改變,如果當前執行緒是擁有鎖的,那麼Thread.sleep()不會讓執行緒釋放鎖。如果能夠幫助你記憶的話,可以簡單認為和鎖相關的方法都定義在Object類中,因此呼叫Thread.sleep()是不會影響鎖的相關行為。

  3. Thread.sleepObject.wait都會暫停當前的執行緒,對於CPU資源來說,不管是哪種方式暫停的執行緒,都表示它暫時不再需要CPU的執行時間。OS會將執行時間分配給其它執行緒。區別是呼叫wait後,需要別的執行緒執行notify/notifyAll才能夠重新獲得CPU執行時間。

執行緒狀態圖:

  • Thread.sleep()讓執行緒從 【running】 -> 【阻塞態】 時間結束/interrupt -> 【runnable】
  • Object.wait()讓執行緒從 【running】 -> 【等待佇列】notify -> 【鎖池】 -> 【runnable】

實現生產者消費者模型

生產者消費者問題是研究多執行緒程式時繞不開的經典問題之一,它描述是有一塊緩衝區作為倉庫,生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品。在Java中一共有四種方法支援同步,其中前三個是同步方法,一個是管道方法。

(1)Object的wait() / notify()方法
(2)Lock和Condition的await() / signal()方法
(3)BlockingQueue阻塞佇列方法
(4)PipedInputStream/PipedOutputStream

本文只介紹最常用的前三種,第四種暫不做討論。

1. 使用Object的wait() / notify()方法

wait()/nofity()方法是基類Object的兩個方法,也就意味著所有Java類都會擁有這兩個方法,這樣,我們就可以為任何物件實現同步機制。

    • wait():當緩衝區已滿/空時,生產者/消費者執行緒停止自己的執行,放棄鎖,使自己處於等待狀態,讓其他執行緒執行。
    • notify():當生產者/消費者向緩衝區放入/取出一個產品時,向其他等待的執行緒發出可執行的通知,同時放棄鎖,使自己處於等待狀態。
import java.util.Queue;
import java.util.Random;

/**
 * 生產者
 */
public class Producer {
    private Queue<Integer> queue;
    int maxSize;
    int i = 0;

    public Producer( Queue<Integer> queue, int maxSize) {
        this.queue = queue;
        this.maxSize = maxSize;
    }

    public void callProduce() throws InterruptedException {
        synchronized (queue) {
            while (queue.size() == maxSize) {
                System.out.println("Queue is full, [" + Thread.currentThread().getName() + "] thread waiting.");
                queue.wait();
            }
            System.out.println("[" + Thread.currentThread().getName() + "] Producing value : " + i);
            queue.offer( i++);
            queue.notifyAll();
            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
Producer
import java.util.Queue;
import java.util.Random;

/**
 * 消費者
 */
public class Consumer {
    private Queue<Integer> queue;
    int maxSize;

    public Consumer( Queue<Integer> queue, int maxSize) {
        this.queue = queue;
        this.maxSize = maxSize;
    }

    public void callConsumer() throws InterruptedException {
        synchronized (queue) {
            while (queue.isEmpty()) {
                System.out.println("Queue is empty, [" + Thread.currentThread().getName() + "] thread is waiting.");
                queue.wait();
            }
            int x = queue.poll();
            System.out.println("[" + Thread.currentThread().getName() + "] Consuming value : " + x);
            queue.notifyAll();

            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
Consumer
import lombok.SneakyThrows;

import java.util.LinkedList;
import java.util.Queue;

/**
 * 生產者消費者模式:使用Object.wait() / notify()方法實現
 */
public class Test {
    private static final int CAPACITY = 5;

    public static void main(String args[]) {
        Queue<Integer> queue = new LinkedList<Integer>();

        Producer producer1 = new Producer( queue, CAPACITY);
        Consumer consumer1 = new Consumer( queue, CAPACITY);

        Thread p1=new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                while (true){
                    producer1.callProduce();
                }
            }
        });
        Thread p2=new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                while (true){
                    producer1.callProduce();
                }
            }
        });
        Thread c1=new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                while (true){
                    consumer1.callConsumer();
                }
            }
        });
        p1.setName("P1");
        p2.setName("P2");
        c1.setName("C1");
        p1.start();
        p2.start();
        c1.start();
    }
}
測試類
D:\jdk1.8.0_241\bin\java "-javaagent:D:\IDEA\IntelliJ IDEA Community Edition 2017.3.5\lib\idea_rt.jar=61728:D:\IDEA\IntelliJ IDEA Community Edition 2017.3.5\bin" -Dfile.encoding=UTF-8 -classpath D:\jdk1.8.0_241\jre\lib\charsets.jar;D:\jdk1.8.0_241\jre\lib\deploy.jar;D:\jdk1.8.0_241\jre\lib\ext\access-bridge-64.jar;D:\jdk1.8.0_241\jre\lib\ext\cldrdata.jar;D:\jdk1.8.0_241\jre\lib\ext\dnsns.jar;D:\jdk1.8.0_241\jre\lib\ext\jaccess.jar;D:\jdk1.8.0_241\jre\lib\ext\jfxrt.jar;D:\jdk1.8.0_241\jre\lib\ext\localedata.jar;D:\jdk1.8.0_241\jre\lib\ext\nashorn.jar;D:\jdk1.8.0_241\jre\lib\ext\sunec.jar;D:\jdk1.8.0_241\jre\lib\ext\sunjce_provider.jar;D:\jdk1.8.0_241\jre\lib\ext\sunmscapi.jar;D:\jdk1.8.0_241\jre\lib\ext\sunpkcs11.jar;D:\jdk1.8.0_241\jre\lib\ext\zipfs.jar;D:\jdk1.8.0_241\jre\lib\javaws.jar;D:\jdk1.8.0_241\jre\lib\jce.jar;D:\jdk1.8.0_241\jre\lib\jfr.jar;D:\jdk1.8.0_241\jre\lib\jfxswt.jar;D:\jdk1.8.0_241\jre\lib\jsse.jar;D:\jdk1.8.0_241\jre\lib\management-agent.jar;D:\jdk1.8.0_241\jre\lib\plugin.jar;D:\jdk1.8.0_241\jre\lib\resources.jar;D:\jdk1.8.0_241\jre\lib\rt.jar;E:\study\target\classes;F:\maven\repo\com\squareup\okhttp3\okhttp\3.11.0\okhttp-3.11.0.jar;F:\maven\repo\com\squareup\okio\okio\1.14.0\okio-1.14.0.jar;F:\maven\repo\org\springframework\spring-beans\4.3.12.RELEASE\spring-beans-4.3.12.RELEASE.jar;F:\maven\repo\org\springframework\spring-context\4.3.12.RELEASE\spring-context-4.3.12.RELEASE.jar;F:\maven\repo\org\springframework\spring-expression\4.3.12.RELEASE\spring-expression-4.3.12.RELEASE.jar;F:\maven\repo\org\springframework\spring-core\4.3.12.RELEASE\spring-core-4.3.12.RELEASE.jar;F:\maven\repo\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;F:\maven\repo\org\springframework\spring-aop\4.3.12.RELEASE\spring-aop-4.3.12.RELEASE.jar;F:\maven\repo\org\aspectj\aspectjweaver\1.8.13\aspectjweaver-1.8.13.jar;F:\maven\repo\org\aspectj\aspectjrt\1.8.13\aspectjrt-1.8.13.jar;F:\maven\repo\org\springframework\spring-jdbc\4.3.12.RELEASE\spring-jdbc-4.3.12.RELEASE.jar;F:\maven\repo\org\springframework\spring-tx\4.3.12.RELEASE\spring-tx-4.3.12.RELEASE.jar;F:\maven\repo\org\projectlombok\lombok\1.16.20\lombok-1.16.20.jar;F:\maven\repo\mysql\mysql-connector-java\5.1.38\mysql-connector-java-5.1.38.jar;F:\maven\repo\org\mybatis\mybatis\3.2.8\mybatis-3.2.8.jar;F:\maven\repo\org\mybatis\mybatis-spring\1.3.2\mybatis-spring-1.3.2.jar;F:\maven\repo\com\mchange\c3p0\0.9.5.2\c3p0-0.9.5.2.jar;F:\maven\repo\com\mchange\mchange-commons-java\0.2.11\mchange-commons-java-0.2.11.jar;F:\maven\repo\com\alibaba\fastjson\1.2.58\fastjson-1.2.58.jar;F:\maven\repo\org\slf4j\slf4j-log4j12\1.7.5\slf4j-log4j12-1.7.5.jar;F:\maven\repo\org\slf4j\slf4j-api\1.7.5\slf4j-api-1.7.5.jar;F:\maven\repo\log4j\log4j\1.2.17\log4j-1.2.17.jar;F:\maven\repo\redis\clients\jedis\2.9.0\jedis-2.9.0.jar;F:\maven\repo\org\apache\commons\commons-pool2\2.4.2\commons-pool2-2.4.2.jar com.design_pattern.produce_consumer.Test
[P2] Producing value : 0
[P1] Producing value : 1
[P1] Producing value : 2
[P1] Producing value : 3
[P1] Producing value : 4
Queue is full, [P1] thread waiting.
[C1] Consuming value : 0
[C1] Consuming value : 1
[C1] Consuming value : 2
[C1] Consuming value : 3
[C1] Consuming value : 4
[P1] Producing value : 5
[P2] Producing value : 6
[P2] Producing value : 7
[P2] Producing value : 8
[P2] Producing value : 9
Queue is full, [P2] thread waiting.
Queue is full, [P1] thread waiting.
[C1] Consuming value : 5
[P1] Producing value : 10
Queue is full, [P2] thread waiting.
Queue is full, [P1] thread waiting.
[C1] Consuming value : 6
[C1] Consuming value : 7
[C1] Consuming value : 8
[C1] Consuming value : 9
[C1] Consuming value : 10
Queue is empty, [C1] thread is waiting.
[P1] Producing value : 11
[P1] Producing value : 12
[P1] Producing value : 13
[P1] Producing value : 14
[P1] Producing value : 15
Queue is full, [P1] thread waiting.
Queue is full, [P2] thread waiting.
[C1] Consuming value : 11
[C1] Consuming value : 12
[C1] Consuming value : 13
[P2] Producing value : 16
[P2] Producing value : 17
[P2] Producing value : 18
Queue is full, [P2] thread waiting.
Queue is full, [P1] thread waiting.
[C1] Consuming value : 14
[C1] Consuming value : 15
[P1] Producing value : 19
[P1] Producing value : 20
Queue is full, [P1] thread waiting.
Queue is full, [P2] thread waiting.
[C1] Consuming value : 16
[C1] Consuming value : 17
[C1] Consuming value : 18
[C1] Consuming value : 19
[C1] Consuming value : 20
Queue is empty, [C1] thread is waiting.
[P2] Producing value : 21
[P1] Producing value : 22
[P1] Producing value : 23
[P2] Producing value : 24
[P2] Producing value : 25
Queue is full, [P2] thread waiting.
[C1] Consuming value : 21
[P2] Producing value : 26
Queue is full, [P2] thread waiting.
Queue is full, [P1] thread waiting.
[C1] Consuming value : 22
[C1] Consuming value : 23
[C1] Consuming value : 24
[C1] Consuming value : 25
[P1] Producing value : 27
[P1] Producing value : 28
[P1] Producing value : 29
[P1] Producing value : 30
Queue is full, [P1] thread waiting.
Queue is full, [P2] thread waiting.
[C1] Consuming value : 26
[C1] Consuming value : 27
[C1] Consuming value : 28
[C1] Consuming value : 29
[C1] Consuming value : 30
Queue is empty, [C1] thread is waiting.
[P2] Producing value : 31
[P2] Producing value : 32
[P1] Producing value : 33
[P2] Producing value : 34
[C1] Consuming value : 31
[C1] Consuming value : 32
[C1] Consuming value : 33
[C1] Consuming value : 34
[P2] Producing value : 35
[P2] Producing value : 36
[P1] Producing value : 37
[P2] Producing value : 38
[C1] Consuming value : 35
[C1] Consuming value : 36
[P2] Producing value : 39
[P1] Producing value : 40
[P2] Producing value : 41
[C1] Consuming value : 37
[P2] Producing value : 42
Queue is full, [P2] thread waiting.
Queue is full, [P1] thread waiting.
執行結果
注意要點

判斷Queue大小為0或者大於等於queueSize時須使用while (condition) {},不能使用if(condition) {}。其中while(condition)迴圈,它又被叫做“自旋鎖”。為防止該執行緒沒有收到notify()呼叫也從wait()中返回(也稱作虛假喚醒),這個執行緒會重新去檢查condition條件以決定當前是否可以安全地繼續執行還是需要重新保持等待,而不是認為執行緒被喚醒了就可以安全地繼續執行了。

2. 使用Lock和Condition的await() / signal()方法

在JDK5.0之後,Java提供了更加健壯的執行緒處理機制,包括同步、鎖定、執行緒池等,它們可以實現更細粒度的執行緒控制。Condition介面的await()signal()就是其中用來做同步的兩種方法,它們的功能基本上和Object的wait()/nofity()相同,完全可以取代它們,但是它們和新引入的鎖定機制Lock直接掛鉤,具有更大的靈活性。通過在Lock物件上呼叫newCondition()方法,將條件變數和一個鎖物件進行繫結,進而控制併發程式訪問競爭資源的安全。下面來看程式碼:

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumer {

    private final Lock lock = new ReentrantLock();

    private final Condition addCondition = lock.newCondition();

    private final Condition subCondition = lock.newCondition();


    private static int num = 0;
    private List<String> lists = new LinkedList<String>();

    public void add() {
        lock.lock();
        try {
            while (lists.size() == 10) {//當集合已滿,則"新增"執行緒等待
                System.out.println(Thread.currentThread().getName()+" list is full,await...");
                addCondition.await();
            }
            num++;
            lists.add("add num " + num);
            System.out.println(Thread.currentThread().getName()+" add num:"+num);

            this.subCondition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {//釋放鎖
            lock.unlock();
        }
    }

    public void sub() {
        lock.lock();
        try {
            while (lists.size() == 0) {//當集合為空時,"減少"執行緒等待
                System.out.println(Thread.currentThread().getName()+" list is empty,await...");
                subCondition.await();
            }
            String str = lists.get(0);
            lists.remove(0);
            System.out.println(Thread.currentThread().getName()+" sub num:"+num);
            num--;
            addCondition.signal();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}
ProducerConsumer