Condition的await和signal等待/通知機制
Condition簡介
任何一個java物件都天然繼承於Object類,線上程間實現通訊的往往會應用到Object的幾個方法,比如 wait()
,wait(long timeout),wait(long timeout, int nanos)
與notify()
,notifyAll()
幾個方法實現等待/通知機制。同樣的, 在 java Lock體系下依然會有同樣的方法實現等待/通知機制。
從整體上來看Object類提供的wait和notify/notify方法是與物件監視器monitor配合完成執行緒間的等待/通知機制,屬於JVM底層實現。而Condition與Lock配合完成的等待通知機制,屬於Java語言級別的,具有更高的可控制性和擴充套件性。
兩者除了在使用方式上不同外,在功能特性上還是有很多的不同:
- Condition能夠支援不響應中斷,而通過使用Object方式不支援;
- Condition能夠支援多個等待佇列(new 多個Condition物件),而Object方式只能支援一個;
- Condition能夠支援超時時間的設定,而Object不支援
Condition方法
參照Object的wait和notify/notifyAll方法,Condition也提供了同樣的方法:
針對Object的wait方法
void await() throws InterruptedException //當前執行緒進入等待狀態,同Object.wait(),直到被中斷或喚醒 long awaitNanos(long nanosTimeout) // 當前執行緒進入等待狀態,不響應中斷,直到被喚醒 boolean await(long time, TimeUnit unit)throws InterruptedException // 同Object.wait(long timeout),多了自定義時間單位,在中斷、超時、被喚醒三種情況下返回 boolean awaitUntil(Date deadline) throws InterruptedException // 支援設定截止時間
針對Object的notify/notifyAll方法
void signal():喚醒一個等待在condition上的執行緒,將該執行緒從等待佇列中轉移到同步佇列中,如果在同步佇列中能夠競爭到Lock則可以從等待方法中返回。
void signalAll():將所有等待在condition上的執行緒全部轉移到同步佇列中
Condition實現原理分析
等待佇列
建立一個 condition 物件是通過lock.newCondition()
,而這個方法實際上是會new出一個ConditionObject
物件,該類是AQS的一個內部類。前面我們說過,condition是要和lock配合使用的也就是condition和Lock是繫結在一起的,而lock的實現原理又依賴於AQS,自然而然ConditionObject作為AQS的一個內部類無可厚非。
我們知道在鎖機制的實現上,AQS內部維護了一個同步佇列,如果是獨佔式鎖的話,所有獲取鎖失敗的執行緒尾插入到同步佇列,同樣的,condition內部也是使用同樣的方式,內部維護了一個等待佇列,所有呼叫condition.await()
方法的執行緒會加入到等待佇列中,並且執行緒狀態轉換為等待狀態。
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
從中我們就可以看出來ConditionObject通過持有等待佇列的頭(Node firstWaiter
)尾(Node lastWaiter
)指標來管理等待佇列。需要注意的是Node類複用了在AQS中的Node類
static final class Node {
...
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
...
}
Node類有這樣一個屬性Node nextWaiter;
(後繼節點),進一步說明,等待佇列是一個單向佇列,而在之前說AQS時知道同步佇列是一個雙向佇列。
我們通過程式碼來進一步驗證等待佇列的結構:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class WaitTestDemo {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
for(int i = 0;i < 10;i++){
new Thread(()->{
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}).start();
}
}
}
新建了10個執行緒,沒有執行緒先獲取鎖,然後呼叫condition.await方法釋放鎖將當前執行緒加入到等待佇列中,通過 debug控制當走到第10個執行緒的時候檢視firstWaiter即等待佇列中的頭結點,debug模式下情景圖如下:
從這個圖我們可以很清楚的看到這樣幾點:
- 呼叫condition.await方法後執行緒依次尾插入到等待佇列中,如圖佇列中的執行緒引用依次為Thread-0,Thread-1,Thread-2…Thread-8;
- 等待佇列是一個單向佇列。通過我們的猜想然後進行實驗驗證,我們可以得出等待佇列的示意圖如下圖所示:
同時還有一點需要注意的是:我們可以多次呼叫lock.newCondition()
方法建立多個condition物件,也就是一個lock可以持有多個等待佇列。而在之前利用Object的方式實際上是指在物件Object物件監視器上只能擁有一個同步佇列和一個等待佇列,而併發包中的Lock擁有一個同步佇列和多個等待佇列。
如圖所示,ConditionObject是AQS的內部類,因此每個ConditionObject能夠訪問到AQS提供的方法,相當於每個Condition都擁有所屬同步器的引用。
Condition應用場景—用Condition來實現有界佇列
有界佇列是一種特殊的佇列,當佇列為空時,佇列的獲取(刪除)操作將會阻塞獲取(刪除)執行緒,直到佇列中有新增 元素;當佇列已滿時,佇列的插入操作將會阻塞插入執行緒,直到隊列出現空位。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedQueue<T> {
private Object[] items;
// 佇列中當前元素個數
private int count;
private Lock lock = new ReentrantLock();
private Condition empty = lock.newCondition();
private Condition full = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
// 新增元素方法,如果當前佇列已滿,
// 則新增執行緒進入等待狀態,直到有空位被喚醒
public void addThraed(T t,int addIndex){
lock.lock();
try {
// 當前佇列已滿,新增執行緒進入等待狀態
while (count == items.length){
try {
full.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
items[addIndex] = t;
count++;
empty.signal();
}finally {
lock.unlock();
}
}
// 刪除元素方法,如果當前佇列為空,
// 則移除執行緒進入等待狀態直到到佇列不為空時被喚醒
public T removeThread(int removeIndex) {
lock.lock();
try {
// 當佇列為空時,移除執行緒進入等待狀態
while (count == 0) {
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object x = items[removeIndex];
count--;
full.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}
await實現原理
當呼叫condition.await()
方法後會使得當前獲取lock的執行緒進入到等待佇列,如果該執行緒能夠從await()方法返回的話, 一定是該執行緒獲取了與condition相關聯的lock。await()方法原始碼為:
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1.將當前執行緒包裝為Node,尾插到等待佇列中
Node node = addConditionWaiter();
// 2.釋放當前執行緒所佔用的lock,釋放後會喚醒同步佇列中的下一個節點
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 當前節點不在同步佇列時被阻塞進入等待狀態
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被喚醒後進入同步佇列自旋競爭同步狀態
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 處理中斷狀況
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
噹噹前執行緒呼叫condition.await()
方法後,會使得當前執行緒釋放lock然後加入到等待佇列中,直至被signal/signalAll()
後會使得當前執行緒從等待佇列中移至到同步佇列中去,直到獲得了lock後才 會從await方法返回,或者在等待時被中斷會做中斷處理。
那麼關於這個實現過程我們會有這樣幾個問題:
- 是怎樣將當前執行緒新增到等待佇列中去的?
- 釋放鎖的過程?
- 怎樣才能從await方法退出?
在第1步中呼叫addConditionWaiter()
將當前執行緒新增到等待佇列中,該方法原始碼為:
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 將當前執行緒包裝為Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
// 尾插入等待佇列
t.nextWaiter = node;
lastWaiter = node;
return node;
}
將當前節點包裝成Node,如果等待佇列的firstWaiter為null的話(等待佇列為空佇列), 則將firstWaiter指向當前的Node;否則,更新lastWaiter(尾節點)即可。就是通過尾插入的方式將當前執行緒封裝的Node插入到等待佇列中即可,同時可以看出等待佇列是一個不帶頭結點的鏈式佇列,我們知道AQS的同步佇列是一個帶頭結點的鏈式佇列,這是兩者的一個區別。
將當前節點插入到等待佇列之後,會使當前執行緒釋放lock,由 fullyRelease()
方法實現,fullyRelease原始碼為:
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final long fullyRelease(Node node) {
boolean failed = true;
try {
long savedState = getState();
// 呼叫AQS的釋放同步狀態方法release()
if (release(savedState)) {
// 成功釋放鎖狀態
failed = false;
return savedState;
} else {
// 釋放同步狀態失敗丟擲異常
throw new IllegalMonitorStateException();
}
} finally {
// 釋放同步狀態失敗後將當前節點狀態置為取消狀態
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
呼叫AQS的模板方法release方法釋放AQS的同步狀態並且喚醒在同步佇列中頭結點的後繼節點引用的執行緒,如果釋放成功則正常返回,若失敗的話就丟擲異常。
那麼怎樣從await方法退出? await方法有這樣一段邏輯:
while (!isOnSyncQueue(node)) {
// 當前節點不在同步佇列時被阻塞進入等待狀態
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
很顯然,當執行緒第一次呼叫condition.await()
方法時,會進入到這個while()迴圈中,然後通過LockSupport.park(this)
方法使得當前執行緒進入等待狀態,那麼要想退出這個await方法第一個前提條件自然而然的是要先退出這個while循 環,出口就只剩下兩個地方:1. 邏輯走到break退出while迴圈;2. while迴圈中的邏輯判斷為false。
程式碼出現第1種情況的條件是當前等待的執行緒被中斷後程式碼會走到break退出,第二種情況是當前節點被移動到了同步佇列中 (即另外執行緒呼叫的condition的signal或者signalAll方法),while中邏輯判斷為false後結束while迴圈。總結下,就是當前執行緒被中斷或者呼叫condition.signal/condition.signalAll
方法使當前節點移動到了同步佇列後 ,這是當前執行緒退出await方法的前提條件。
當退出while迴圈後就會呼叫acquireQueued(node, savedState)
,這個方法在介紹AQS的底層實現時說過了,該方法的作用是在自旋過程中執行緒不斷嘗試獲取同步狀態,直至成功(執行緒獲取到lock)。這 樣也說明了退出await方法必須是已經獲得了condition引用(關聯)的lock。
如上圖所示:呼叫condition.await()
方法的執行緒必須是已經獲得了lock,也就是當前執行緒是同步佇列中的頭結點。呼叫該方法後會使得當前執行緒所封裝的Node尾插入到等待佇列中。
signal/signalAll實現原理
呼叫condition的signal或者signalAll方法可以將等待佇列中等待時間最長的節點移動到同步佇列中,使得該節點能夠有機會獲得lock。按照等待佇列是先進先出(FIFO)的原則,等待佇列的頭節點必然會是等待時間長的節點, 也就是每次呼叫condition的signal方法是將頭節點移動到同步佇列中。signal方法原始碼為:
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 當前執行緒是否已經獲取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 獲取等待佇列的第一個節點,之後的操作都是針對這個節點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
下面我們來看看doSignal方法做了些什麼事情,doSignal()
方法原始碼為:
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 將頭結點從等待佇列中移除
first.nextWaiter = null;
// transferForSignal方法對頭結點做真正的處理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
真正對頭節點做處理的邏輯在transferForSignal()
方法,該方法原始碼為:
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 首先將節點狀態更新為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 將節點使用enq方法尾插到同步佇列中
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
這段程式碼主要做了兩件事情:
- 將頭結點的狀態更改為CONDITION;
- 呼叫
enq()
方法,將該節點尾插入到同步佇列中。
現在我們可以得出結論:呼叫condition的signal的前提條件是當前執行緒已經獲取了lock,該方法會使得等待佇列中的頭節點即等待時間最長的那個節點移入到同步佇列,而移入到同步佇列後才有機會使得等待執行緒被喚醒,即從await方法中的LockSupport.park(this)
方法中返回,從而才有機會使得呼叫await方法的執行緒成功退出。signal執行示意圖如下圖:
signalAll()方法底層原理
signalAll()
與sigal()
方法的區別體現在doSignalAll()
方法上,前面我們已經知道doSignal方法只會對等待佇列的頭節點進行操作,而doSignalAll的原始碼為:
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
該方法只不過是將等待佇列中的每一個節點都移入到同步佇列中,即“通知”當前呼叫condition.await()
方法的每一個執行緒。
Condition機制實現生產-消費者模型
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Goods{
private String name;
private int count;
private int maxCount;
private Lock lock = new ReentrantLock();
// 消費者等待佇列
private Condition consumerCondition = lock.newCondition();
// 生產者等待佇列
private Condition producerCondition = lock.newCondition();
public Goods(int maxCount) {
this.maxCount = maxCount;
}
@Override
public String toString() {
return "Goods{" +
"name='" + name + '\'' +
", count=" + count +
'}';
}
/**
生產者方法
@param name 設定商品名稱
*/
public void setGoods(String name){
lock.lock();
try{
// 商品數量達到最大值,生產者執行緒進入生產者等待佇列
while (maxCount == count){
try {
System.out.println(Thread.currentThread().getName()
+"還有很多商品,歇會~");
producerCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.name = name;
count++;
System.out.println(Thread.currentThread().getName()+"生產"
+toString());
// 喚醒處於消費者的執行緒
consumerCondition.signalAll();
}finally {
lock.unlock();
}
}
/**
消費者方法
*/
public void getGoods(){
lock.lock();
try{
while (0 == count){
try {
System.out.println(Thread.currentThread().getName()
+"還沒有商品~");
consumerCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
System.out.println(Thread.currentThread().getName()+"消費"
+toString());
// 喚醒所有生產者的執行緒
producerCondition.signalAll();
}finally {
lock.unlock();
}
}
}
class Producer implements Runnable{
private Goods goods;
public Producer(Goods goods) {
this.goods = goods;
}
@Override
public void run() {
while(true){
this.goods.setGoods("限量版Mac口紅");
}
}
}
class Consumer implements Runnable{
private Goods goods;
public Consumer(Goods goods) {
this.goods = goods;
}
@Override
public void run() {
while(true){
this.goods.getGoods();
}
}
}
public class Cache {
public static void main(String[] args) {
Goods goods = new Goods(10);
Producer producer = new Producer(goods);
Consumer consumer = new Consumer(goods);
List<Thread> list = new ArrayList<>();
// 啟動5個生產者執行緒
for (int i = 0;i < 5;i++){
Thread thread = new Thread(producer,"生產者"+i);
list.add(thread);
}
// 啟動10個消費者執行緒
for (int i = 0;i < 10;i++){
Thread thread = new Thread(consumer,"消費"+i);
list.add(thread);
}
for (Thread thread : list){
thread.start();
}
}
}