多執行緒必考的「生產者 - 消費者」模型,看齊姐這篇文章就夠了
生產者 - 消費者模型 Producer-consumer problem
是一個非常經典的多執行緒併發協作的模型,在分散式系統裡非常常見。也是面試中無論中美大廠都非常愛考的一個問題,對應屆生問的要少一些,但是對於有工作經驗的工程師來說,非常愛考。
這個問題有非常多的版本和解決方式,在本文我重點是和大家壹齊理清思路,由淺入深的思考問題,保證大家看完了都能有所收穫。
問題背景
簡單來說,這個模型是由兩類執行緒構成:
生產者執行緒:“生產”產品,並把產品放到一個佇列裡; 消費者執行緒:“消費”產品。
有了這個佇列,生產者就只需要關注生產,而不用管消費者的消費行為,更不用等待消費者執行緒執行完;消費者也只管消費,不用管生產者是怎麼生產的,更不用等著生產者生產。
所以該模型實現了生產者和消費者之間的解藕和非同步。
什麼是非同步呢?
比如說你和你女朋友打電話,就得等她接了電話你們才能說話,這是同步。
但是如果你跟她發微信,並不需要等她回覆,她也不需要立刻回覆,而是等她有空了再回,這就是非同步。
但是呢,生產者和消費者之間也不能完全沒有聯絡的。
如果佇列裡的產品已經滿了,生產者就不能繼續生產; 如果佇列裡的產品從無到有,生產者就得通知一下消費者,告訴它可以來消費了; 如果佇列裡已經沒有產品了,消費者也無法繼續消費; 如果佇列裡的產品從滿到不滿,消費者也得去通知下生產者,說你可以來生產了。
所以它們之間還需要有協作,最經典的就是使用 Object
類裡自帶的 wait()
notify()
或者 notifyAll()
的訊息通知機制。
上述描述中的等著,其實就是用 wait()
來實現的;
而通知,就是 notify()
或者 notifyAll()
。
那麼基於這種訊息通知機制,我們還能夠平衡生產者和消費者之間的速度差異。
如果生產者的生產速度很慢,但是消費者消費的很快,就像是我們每月工資就發兩次,但是每天都要花錢,也就是 1:15.
那麼我們就需要調整生產者(發工資)為 15 個執行緒,消費者保持 1 個執行緒,這樣是不是很爽~
總結下該模型的三大優點:
解藕,非同步,平衡速度差異。
wait()/notify()
接下來我們需要重點看下這個通知機制。
wait()
和 notify()
都是 Java 中的 Object
類自帶的方法,可以用來實現執行緒間的通訊。
在上一節講的 11 個 APIs 裡我也提到了它,我們這裡再展開講一下。
wait()
方法是用來讓當前執行緒等待,直到有別的執行緒呼叫 notify()
將它喚醒,或者我們可以設定一個時間讓它自動甦醒。
呼叫該方法之前,執行緒必須要獲得該物件的物件監視器鎖,也就是隻能用在加鎖的方法下。
而呼叫該方法之後,當前執行緒會釋放鎖。(提示:這裡很重要,也是下文程式碼中用 while
而非 if
的原因。)
notify()
方法只能通知一個執行緒,如果多個執行緒在等待,那就喚醒任意一個。
notifyAll()
方法是可以喚醒所有等待執行緒,然後加入同步佇列。
這裡我們用到了 2 個佇列:
同步佇列:對應於我們上一節講的執行緒狀態中的 Runnable
,也就是執行緒準備就緒,就等著搶資源了。等待佇列:對應於我們上一節講的執行緒狀態中的 Waiting
,也就是等待狀態。
這裡需要注意,從等待狀態執行緒無法直接進入 Q2,而是要先重新加入同步佇列,再次等待拿鎖,拿到了鎖才能進去 Q2;一旦出了 Q2,鎖就丟了。
在 Q2
裡,其實只有一個執行緒,因為這裡我們必須要加鎖才能進行操作。
實現
這裡我首先建了一個簡單的 Product
類,用來表示生產和消費的產品,大家可以自行新增更多的 fields
。
public class Product {
private String name;
public Product(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
主函式裡我設定了兩類執行緒,並且這裡選擇用普通的 ArrayDeque
來實現 Queue
,更簡單的方式是直接用 Java 中的 BlockingQueue
來實現。
BlockingQueue
是阻塞佇列,它有一系列的方法可以讓執行緒實現自動阻塞,常用的 BlockingQueue
有很多,後面會單獨出一篇文章來講。
這裡為了更好的理解併發協同的這個過程,我們先自己處理。
public class Test {
public static void main(String[] args) {
Queue<Product> queue = new ArrayDeque<>();
for (int i = 0; i < 100; i++) {
new Thread(new Producer(queue, 100)).start();
new Thread(new Consumer(queue, 100)).start();
}
}
}
然後就是 Producer
和 Consumer
了。
public class Producer implements Runnable{
private Queue<Product> queue;
private int maxCapacity;
public Producer(Queue queue, int maxCapacity) {
this.queue = queue;
this.maxCapacity = maxCapacity;
}
@Override
public void run() {
synchronized (queue) {
while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解釋
try {
System.out.println("生產者" + Thread.currentThread().getName() + "等待中... Queue 已達到最大容量,無法生產");
wait();
System.out.println("生產者" + Thread.currentThread().getName() + "退出等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (queue.size() == 0) { //佇列裡的產品從無到有,需要通知在等待的消費者
queue.notifyAll();
}
Random random = new Random();
Integer i = random.nextInt();
queue.offer(new Product("產品" + i.toString()));
System.out.println("生產者" + Thread.currentThread().getName() + "生產了產品:" + i.toString());
}
}
}
其實它的主邏輯很簡單,我這裡為了方便演示加了很多列印語句才顯得有點複雜。
我們把主要邏輯拎出來看:
public void run() {
synchronized (queue) {
while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解釋
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (queue.size() == 0) {
queue.notifyAll();
}
queue.offer(new Product("產品" + i.toString()));
}
}
}
這裡有 3 塊內容,再對照這個過程來看:
生產者執行緒拿到鎖後,其實就是進入了 Q2
階段。首先檢查佇列是否容量已滿,如果滿了,那就要去Q3
等待;如果不滿,先檢查一下佇列原本是否為空,如果原來是空的,那就需要通知消費者; 最後生產產品。
這裡有個問題,為什麼只能用 while
而不是 if
?
其實在這一小段,生產者執行緒經歷了幾個過程:
如果佇列已滿,它就沒法生產,那也不能佔著位置不做事,所以要把鎖讓出來,去 Q3 - 等待佇列
等著;在等待佇列裡被喚醒之後,不能直接奪過鎖來,而是要先加入 Q1 - 同步佇列
等待資源;一旦搶到資源,關門上鎖,才能來到 Q2
繼續執行wait()
之後的活,但是,此時這個佇列有可能又滿了,所以退出wait()
之後,還需要再次檢查queue.size() == maxCapacity
這個條件,所以要用while
。
那麼為什麼可能又滿了呢?
因為執行緒沒有一直拿著鎖,在被喚醒之後,到拿到鎖之間的這段時間裡,有可能其他的生產者執行緒先拿到了鎖進行了生產,所以佇列又經歷了一個從不滿到滿的過程。
總結:在使用執行緒的等待通知機制時,一般都要在 while
迴圈中呼叫 wait()
方法。
消費者執行緒是完全對稱的,我們來看程式碼。
public class Consumer implements Runnable{
private Queue<Product> queue;
private int maxCapacity;
public Consumer(Queue queue, int maxCapacity) {
this.queue = queue;
this.maxCapacity = maxCapacity;
}
@Override
public void run() {
synchronized (queue) {
while (queue.isEmpty()) {
try {
System.out.println("消費者" + Thread.currentThread().getName() + "等待中... Queue 已缺貨,無法消費");
wait();
System.out.println("消費者" + Thread.currentThread().getName() + "退出等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (queue.size() == maxCapacity) {
queue.notifyAll();
}
Product product = queue.poll();
System.out.println("消費者" + Thread.currentThread().getName() + "消費了:" + product.getName());
}
}
}
結果如下:
小結
生產者 - 消費者問題是面試中經常會遇到的題目,本文首先講了該模型的三大優點:解藕,非同步,平衡速度差異,然後講解了等待/通知的訊息機制以及在該模型中的應用,最後進行了程式碼實現。
文中所有程式碼已經放到了我的 Github 上:https://github.com/xiaoqi6666/NYCSDE
。
這個 Github 彙總了我所有的文章和資料,之後也會一直更新和維護,還希望大家幫忙點個 Star
,你們的支援和認可,就是我創作的最大動力,我們下篇文章見!
我是小齊,紐約程式媛,終生學習者,每天晚上 9 點,雲自習室裡不見不散!