java併發等待條件的實現原理(Condition)
前言
前面介紹了排它鎖,共享鎖的實現機制,本篇繼續學習AQS中的另外一個內容-Condition。想必學過java的都知道Object.wait和Object.notify,同時也應該知曉這兩個方法的使用離不開synchronized關鍵字。synchronized是jvm級別提供的同步原語,它的實現機制隱藏在jvm實現中。作為Lock系列功能中的Condition,就是用來實現類似 Object.wait和Object.notify 對應功能的。
使用場景
為了更好的理解Lock和Condition的使用場景,下面我們先來實現這樣一個功能:有多個生產者,多個消費者,一個產品容器,我們假設容器最多可以放3個產品,如果滿了,生產者需要等待產品被消費,如果沒有產品了,消費者需要等待。我們的目標是一共生產10個產品,最終消費10個產品,如何在多執行緒環境下完成這一挑戰呢?下面是我簡單實現的一個demo,僅供參考。
package com.lock.condition.test;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class LockConditionTest {
// 生產 和 消費 的最大總數
public static int totalCount = 10;
// 已經生產的產品數
public static volatile int hasProduceCount = 0;
// 已經消費的產品數
public static volatile int hasConsumeCount = 0;
// 容器最大容量
public static int containerSize = 3;
// 使用公平策略的可重入鎖,便於觀察演示結果
public static ReentrantLock lock = new ReentrantLock(true );
public static Condition notEmpty = lock.newCondition();
public static Condition notFull = lock.newCondition();
// 容器
public static LinkedList<Integer> container = new LinkedList<Integer>();
// 用於標識產品
public static AtomicInteger idGenerator = new AtomicInteger();
public static void main(String[] args) {
Thread p1 = new Thread(new Producer(), "p-1");
Thread p2 = new Thread(new Producer(), "p-2");
Thread p3 = new Thread(new Producer(), "p-3");
Thread c1 = new Thread(new Consumer(), "c-1");
Thread c2 = new Thread(new Consumer(), "c-2");
Thread c3 = new Thread(new Consumer(), "c-3");
c1.start();
c2.start();
c3.start();
p1.start();
p2.start();
p3.start();
try{
c1.join();
c2.join();
c3.join();
p1.join();
p2.join();
p3.join();
}catch(Exception e){
}
System.out.println(" done. ");
}
static class Producer implements Runnable{
@Override
public void run() {
while(true){
lock.lock();
try{
// 容器滿了,需要等待非滿條件
while(container.size() >= containerSize){
notFull.await();
}
// 到這裡表明容器未滿,但需要再次判斷是否已經完成了任務
if(hasProduceCount >= totalCount){
System.out.println(Thread.currentThread().getName()+" producer exit");
return ;
}
int product = idGenerator.incrementAndGet();
// 把生產出來的產品放入容器
container.addLast(product);
System.out.println(Thread.currentThread().getName() + " product " + product);
hasProduceCount++;
// 通知消費執行緒可以去消費了
notEmpty.signal();
} catch (InterruptedException e) {
}finally{
lock.unlock();
}
}
}
}
static class Consumer implements Runnable{
@Override
public void run() {
while(true){
lock.lock();
try{
if(hasConsumeCount >= totalCount){
System.out.println(Thread.currentThread().getName()+" consumer exit");
return ;
}
// 一直等待有產品了,再繼續往下消費
while(container.isEmpty()){
notEmpty.await(2, TimeUnit.SECONDS);
if(hasConsumeCount >= totalCount){
System.out.println(Thread.currentThread().getName()+" consumer exit");
return ;
}
}
Integer product = container.removeFirst();
System.out.println(Thread.currentThread().getName() + " consume " + product);
hasConsumeCount++;
// 通知生產執行緒可以繼續生產產品了
notFull.signal();
} catch (InterruptedException e) {
}finally{
lock.unlock();
}
}
}
}
}
一次執行的結果如下:
p-1 product 1
p-3 product 2
p-2 product 3
c-3 consume 1
c-2 consume 2
c-1 consume 3
p-1 product 4
p-3 product 5
p-2 product 6
c-3 consume 4
c-2 consume 5
c-1 consume 6
p-1 product 7
p-3 product 8
p-2 product 9
c-3 consume 7
c-2 consume 8
c-1 consume 9
p-1 product 10
p-3 producer exit
p-2 producer exit
c-3 consume 10
c-2 consumer exit
c-1 consumer exit
p-1 producer exit
c-3 consumer exit
done.
從結果可以發現已經達到我們的目的了。
深入理解Condition的實現原理
上面的示例只是為了展示 Lock結合Condition可以實現的一種經典場景,在有了感性的認識之後,我們將一步一步來觀察Lock和Condition是如何協作完成這一任務的,這也是本篇的核心內容。
為了更好的理解和演示這一個過程,我們使用到的鎖是使用公平策略模式的,我們會使用上面例子運作的流程。我們會使用到3個生產執行緒,3個消費執行緒,分別表示 p1、p2、p3和c1、c2、c3。
Condition的內部實現是使用節點鏈來實現的,每個條件例項對應一個節點鏈,我們有notEmpty 和 notFull 兩個條件例項,所以會有兩個等待節點鏈。
一切準備就緒 ,開始我們的探索之旅。
1、執行緒c3執行,然後發現沒有產品可以消費,執行 notEmpty.await,進入等待佇列中等候。
2、執行緒c2和執行緒c1執行,然後發現沒有產品可以消費,執行 notEmpty.await,進入等待佇列中等候。
3、 執行緒 p1 啟動,得到了鎖,p1開始生產產品,這時候p3搶在p2之前,執行了lock操作,結果p2和p3都處於等待狀態,入同步佇列等待。
注意,本例中我們使用的是公平策略模式下的排它鎖,由於p3搶先執行取鎖操作,所以雖然p2和p3都被阻塞了,但是p3會優先被喚醒 。
4、這會,p1生產完畢,通知 not empty等待佇列,可以喚醒一個等待執行緒節點了,然後釋放了鎖,釋放鎖會導致p3被喚醒,然後p1進入下一個迴圈,進入同步佇列。
事情開始變得有趣了,p1執行一次生產後,執行了 notEmpty.signal,其效果就是把 not empty等待列表中的頭節點,即c3節點移到同步等待列隊中,重新參與搶佔鎖。
5、p3生產完了產品後,繼續notEmpty.signal,同時釋放鎖,釋放鎖後會喚醒p2執行緒,然後p3在下一輪嘗試獲取鎖的時候,再次入隊。
6、接著,p2繼續生產,生產後執行 notEmpty.signal,同時釋放鎖,釋放鎖後喚醒c3執行緒,然後p2在下一輪嘗試取鎖的時候,入列。
7、c3進行消費,你可以看到,現在 not empty等待列隊中已經沒有等待節點了,由於我們使用的是公平策略排它鎖,這就會導致同步佇列中的節點一個接著一個執行,而目前同步佇列中的節點排列為一生產,一消費,這不難可以知道,接下來程式碼已經不會進入 wait條件了,所以一個一個輪流執行就是,比如c3,執行完了,繼續notFull.signal(); 然後釋放鎖,入隊,這裡要明白,notFull.signal();這句程式碼其實沒有作用了,因為 not full等待佇列中沒有任何等待執行緒節點。 c3執行後,狀態如下圖所示:
8、後面的事情我想大家都可以想得出來是怎樣一步一步交替執行的了。
總結
本篇基於一個例項來演示結合Lock和Condition如何實現生產-消費模式,而且只討論一種可能執行的流程,是想更簡單的表述AQS底層是如何實現的。基於上面這個演示過程,針對其它的執行流程,其原來也是一樣的。Condition內部使用一個節點鏈來儲存所有 wait狀態的執行緒,當對應條件被signal的時候,就會把等待節點轉移到同步佇列中,繼續競爭鎖。原理其實並不複雜,有興趣的朋友可以翻閱原始碼。