關於java多執行緒淺析五: Condition條件
java.util.concurrent 包在java語言中可以說是比較難啃的一塊,但理解好這個包下的知識,對學習java來說,不可謂是一種大的提升,我也嘗試著用自己不聰明的腦袋努力的慢慢啃下點東西來。其實 java.util.concurrent 包中,最核心的就是AQS( AbstractQueuedSynchronizer) 這個抽象類,可以說是整個JUC包的基石,但今天先不說AQS,我先從比較容易理解的 Condition 條件講起。
什麼是Condition條件
Condition 是定義在 java.util.concurrent.locks 包下的一個介面,這個介面主要的功能就是實現了與Object類中的wait(),notify()方法相同的語義,但是功能上更強大。Condition 介面中定義的方式其實很少,列舉下來:
//使當前執行緒接到signal訊號之前,或者被中斷之前一直處於等待狀態
void await() throws InterruptedException;
//使當前執行緒接到signal訊號之前一直處於等待狀態
void awaitUninterruptibly();
//使當前執行緒接到signal訊號之前,或到達指定等待的時間之前,或者被中斷之前一直處於等待狀態
boolean await(long time, TimeUnit unit) throws InterruptedException;
//使當前執行緒接到signal訊號之前,或到達指定的最後期限時間之前,或者被中斷之前一直處於等待狀態
boolean awaitUntil(Date deadline) throws InterruptedException;
//使當前執行緒接到signal訊號之前,或到達指定等待的時間之前,或者被中斷之前一直處於等待狀態
long awaitNanos(long nanosTimeout) throws InterruptedException;
//向一個執行緒傳送喚醒訊號
void signal();
//向所有執行緒傳送喚醒訊號
void signalAll();
從定義的方法中也可以看出這個類的功能,無非就兩種:等待方法、喚醒等待方法。
Condition 的使用
下面用一個之前用的小demo展示一下 Condition 的使用方法:
先使用Object中的wait,notify 方法:
public class TestCondition {
public static void main(String[] args) {
try {
ThreadTest t1 = new ThreadTest("t1");
synchronized (t1) {
System.out.println(Thread.currentThread().getName()+"執行緒啟動執行緒 t1");
t1.start();
System.out.println(Thread.currentThread().getName()+"執行緒執行wait方法,等待被喚醒");
t1.wait();
System.out.println(Thread.currentThread().getName()+"執行緒繼續執行");
}
} catch (InterruptedException e)
e.printStackTrace();
}
}
}
class ThreadTest extends Thread{
public ThreadTest(String name){
super(name);
}
public void run(){
synchronized (this) {
System.out.println(Thread.currentThread().getName()+"執行緒執行 notify 方法");
notify();
}
}
}
執行結果:
main執行緒啟動執行緒 t1
main執行緒執行wait方法,等待被喚醒
t1執行緒執行 notify 方法
main執行緒繼續執行
這裡就不再詳述關於 synchronized 關鍵字的使用,舉這個示例是為了引出Condition的使用。
稍微改一下上面的程式:
public class TestCondition {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {
try {
ThreadTest t1 = new ThreadTest("t1");
lock.lock();
System.out.println(Thread.currentThread().getName()+"執行緒啟動執行緒 t1");
t1.start();
System.out.println(Thread.currentThread().getName()+"執行緒執行condition.await()方法,等待被喚醒");
condition.await();
System.out.println(Thread.currentThread().getName()+"執行緒繼續執行");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
static class ThreadTest extends Thread{
public ThreadTest(String name){
super(name);
}
public void run(){
try{
lock.lock();
System.out.println(Thread.currentThread().getName()+"執行緒執行condition.signal()方法");
condition.signal();
}finally{
lock.unlock();
}
}
}
}
執行結果:
main執行緒啟動執行緒 t1
main執行緒執行condition.await()方法,等待被喚醒
t1執行緒執行condition.signal()方法
main執行緒繼續執行
根據上面的例項可以看出,Condition工具類是配合Lock類一起使用的,當然Condition的作用遠不止上面程式碼這樣簡單,其實它最主要的作用是可以在同一把鎖上,針對不同的業務使用不用的Condition。比如在前面的文章提到的生產消費問題,我們完全可以使用兩個Condition,一個針對於生產,一個針對於消費,當產品為0時,我們可以讓消費的Condition執行await方法,當產品不為0時,可以讓消費的Condition執行signal方法。生產者也是類似,具體程式碼這裡就不再詳述了,可以嘗試自己實現一下。
Condition 的實現類:ConditionObject
Condition 是一個介面,那它到底是怎麼工作的呢?我們來看一下ReentrantLock 類是怎麼樣使用Condition 的。
Condition condition = lock.newCondition();//這是生成Condition 的方法
追蹤一下newCondition( )這個方法,我們就可以看到一個Condition 的具體實現:
public Condition newCondition() {
return sync.newCondition(); //呼叫sync的newCondition
}
//sync的方法,返回一個new ConditionObject()
final ConditionObject newCondition() {
return new ConditionObject();
}
這個sync是什麼呢?它是定義在ReentrantLock 中的內部類,它繼承了上面提到的AQS這個抽象類,從某種角度來說ReentrantLock 只是提供了一個可以操作AQS這個核心類的入口,代理了一些重要的方法,那為什麼不讓ReentrantLock 直接繼承AQS,而是選擇用一個內部類來實現呢,可能是出於一些安全性方面的考慮。ConditionObject是定義在AQS中的。
Condition 與 AQS 到底是怎麼工作的?
探究這個問題,就需要來看一下ConditionObject的原始碼了。
代表性的,我們看一下await方法和signal方法的原始碼(基於jdk1.8.0_112):
await方法:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); //判斷執行緒的中斷狀態,如果中斷則丟擲異常
//將當前執行緒包裝成一個條件等待節點新增到ConditionObject維護的一個佇列中
Node node = addConditionWaiter();
//釋放當前執行緒佔有的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
//這個while 迴圈就是在當前執行緒釋放鎖後,一直觀察持有自己執行緒的節點有沒有被載入到
//AQS維護的等待佇列中(加入到這個佇列中才有獲取鎖的資格),什麼時候會加入到這個佇列中呢?
//當然是執行了喚醒這個執行緒對應的singal方法的時候啦
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
在看一下signal方法:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;//ConditionObject維護佇列中的頭節點,進行喚醒操作
if (first != null)
doSignal(first);
}
從上面的敘述不難看出,AQS 和 ConditionObject各自維護了一個佇列,用來存放包裝了執行緒的Node節點。聯絡上面的程式碼示例和原始碼可以總結一下AQS和ConditionObject中的佇列是怎麼被維護的:
(1) 示例中存在兩個執行緒:main執行緒 t1執行緒
(2) 當main執行緒呼叫lock.locak()方法時,main獲取到鎖,繼續執行, 當執行到t1.start()方法時,t1也想獲取lock,但是此時該鎖已經被main執行緒佔用,所以t1執行緒進入 AQS維護的等待佇列中,等待機會獲取cpu的佔用權。
(3) 當main執行緒執行了condition.await()方法時,main執行緒就在釋放佔用鎖的同時加入到了ConditionObject維護的等待佇列中,在這個佇列中的執行緒,如果signal狀態不發生改變,是永遠沒有機會獲取到cpu的佔有權的。
(4) 好,這個時候main已經進入了ConditionObject維護的等待佇列中,那麼AQS維護的等待佇列中的t1執行緒就可以獲取cpu的佔有權了,可以繼續執行。
(5) 當t1執行緒執行到 condition.signal() 方法時,就會喚醒ConditionObject維護的等待佇列中的頭節點,也就是main執行緒。但是注意,這裡喚醒的意思是將main執行緒節點放到AQS維護的等待佇列中,然後聽從AQS的排程,並不是馬上就能獲取cpu的佔有權。
(6) 然後t1執行緒執行結束,unlock釋放佔用的鎖,在AQS維護的等待佇列中的main就能繼續執行下去了。
總之:
- ConditionObject維護的等待佇列的功能是存放那些執行了await方法的執行緒,等待收到signal資訊好可以進入AQS維護的等待佇列中。在這個佇列中是不會獲取到鎖的。
- AQS維護的等待佇列存放那些就緒狀態的執行緒,只等待目前佔有鎖的傢伙執行完或者進入了ConditionObject維護的等待佇列中之後,來進行競爭獲取鎖。只有在這個佇列中才有資格(並不一定會)獲取鎖。