1. 程式人生 > 程式設計 >Java多執行緒Condition介面原理介紹

Java多執行緒Condition介面原理介紹

Condition介面提供了類似Object的監視器方法,與Lock配合可以實現等待/通知模式,但是這兩者在使用方式以及功能特性上還是有差別的

Java多執行緒Condition介面原理介紹

Condition介面詳解

Condition定義了等待/通知兩種型別的方法,當前執行緒呼叫這些方法時,需要提前獲取到Condition物件關聯的鎖。Condition物件是由Lock物件(呼叫Lock物件的newCondition()方法)創建出來的,換句話說,Condition是依賴Lock物件的。

Lock lock = new ReentrantLock();
 Condition condition = lock.newCondition();
 public void conditionWait() throws InterruptedException {
  lock.lock();
  try {
   condition.await();
  } finally {
   lock.unlock();
  }
 }
 public void conditionSignal() throws InterruptedException {
  lock.lock();
  try {
   condition.signal();
  } finally {
   lock.unlock();
  }
 }

一般都會將Condition物件作為成員變數。當呼叫await()方法後,當前執行緒會釋放鎖並在此等待,而其他執行緒呼叫Condition物件的signal()方法,通知當前執行緒後,當前執行緒才從await()方法返回,並且在返回前已經獲取了鎖。

 /**
  * 當前執行緒進入等待狀態直到被通知(signal)或中斷,當前執行緒進入後臺執行狀態且從await()方法返回
  * 其他執行緒呼叫該Condition的signal或者signalAll方法,而當前執行緒被選中喚醒
  * 1、其他執行緒(interrupt)中斷當前執行緒
  * 2、如果當前等待執行緒從await方法返回,那麼表明當前執行緒已經獲取了Condition物件的鎖
  */
 void await() throws InterruptedException;

 /**
  * 當前執行緒進入等待狀態直到被通知,對中斷不響應
  */
 void awaitUninterruptibly();

 /**
  * <pre> {@code
  * boolean aMethod(long timeout,TimeUnit unit) {
  * long nanos = unit.toNanos(timeout);
  * lock.lock();
  * try {
  *  while (!conditionBeingWaitedFor()) {
  *  if (nanos <= 0L)
  *   return false;
  *  nanos = theCondition.awaitNanos(nanos);
  *  }
  *  // ...
  * } finally {
  *  lock.unlock();
  * }
  * }}</pre>
  * 當前執行緒進入等待狀態直到被通知、中斷或超時。返回值表示剩餘時間,如果在nanosTimeout納秒之前被喚醒,那麼返回值就是nanosTimeout-實際耗時
  * 返回值<=0說明超時
  * 
  */
 long awaitNanos(long nanosTimeout) throws InterruptedException;

 /**
  * 當前執行緒進入等待狀態直到被通知、中斷或超時,如果沒有到指定時間被通知返回true,否則返回false
  */
 boolean await(long time,TimeUnit unit) throws InterruptedException;

 /**
  * 喚醒一個等待在Condition上的執行緒,該執行緒從等待方法返回之前必須獲得與Condition相關聯的鎖
  */
 void signal();

獲取一個Condition必須通過Lock的newCondition()方法。下面通過一個有界佇列的示例來深入瞭解Condition的使用方式。

有界佇列是一種特殊的佇列,當佇列為空時,佇列的獲取操作將會阻塞獲取執行緒,直到佇列中有新增元素,當佇列已滿時,佇列的插入操作將會阻塞插入執行緒,直到隊列出現“空位”

public class BoundedQueue<T> {
 private Object[] items;

 // 新增的下標,刪除的下標和陣列當前數量
 private int addIndex,removeIndex,count;
 private Lock lock = new ReentrantLock();
 private Condition notEmpty = lock.newCondition();
 private Condition notFull = lock.newCondition();
 public BoundedQueue(int size){
  items = new Object[size];
 }

 /**
  * 新增一個元素,如果陣列滿,則新增執行緒進入等待狀態,直到有"空位"
  * @author fuyuwei
  * 2017年5月21日 下午6:14:55
  * @param t
  * @throws InterruptedException
  */
 public void add(T t) throws InterruptedException{
  lock.lock();
  try{
   while(count == items.length){
    notFull.await();
   }
   items[addIndex] = t;
   if(++addIndex == items.length)
    addIndex = 0;
   ++count;
   notEmpty.signal();
  }finally{
   lock.unlock();
  }
 }

 /**
  * 由頭部刪除一個元素,如果陣列空,則刪除執行緒進入等待狀態,直到有新新增元素
  * @author fuyuwei
  * 2017年5月21日 下午6:20:54
  * @return
  * @throws InterruptedException
  */
 @SuppressWarnings("unchecked")
 public T remove() throws InterruptedException{
  lock.lock();
  try{
   while(count == 0)
    notEmpty.await();
   Object x = items[removeIndex];
   if(++removeIndex == items.length)
    removeIndex = 0;
   --count;
   notFull.signal();
   return (T)x;
  }finally{
   lock.unlock();
  }
 }
}

首先需要獲得鎖,目的是確保陣列修改的可見性和排他性。當陣列數量等於陣列長度時,表示陣列已滿,則呼叫notFull.await(),當前執行緒隨之釋放鎖並進入等待狀態。如果陣列數量不等於陣列長度,表示陣列未滿,則新增元素到陣列中,同時通知等待在notEmpty上的執行緒,陣列中已經有新元素可以獲取。

在新增和刪除方法中使用while迴圈而非if判斷,目的是防止過早或意外的通知,只有條件符合才能夠退出迴圈。回想之前提到的等待/通知的經典範式,二者是非常類似的

Condition原理分析

ConditionObject是同步器AbstractQueuedSynchronizer的內部類,因為Condition的操作需要獲取相關聯的鎖,所以作為同步器的內部類也較為合理。每個Condition物件都包含著一個佇列,該佇列是Condition物件實現等待/通知功能的關鍵。下面將分析Condition的實現,主要包括:等待佇列、等待和通知

等待佇列

等待佇列是一個FIFO的佇列,在佇列中的每個節點都包含了一個執行緒引用,該執行緒就是在Condition物件上等待的執行緒,如果一個執行緒呼叫了Condition.await()方法,那麼該執行緒將會釋放鎖、構造成節點加入等待佇列並進入等待狀態

一個Condition包含一個等待佇列,Condition擁有首節點(firstWaiter)和尾節點(lastWaiter)。當前執行緒呼叫Condition.await()方法,將會以當前執行緒構造節點,並將節點從尾部加入等待佇列,等待佇列的基本結構如下圖所示

Java多執行緒Condition介面原理介紹

如圖所示,Condition擁有首尾節點的引用,而新增節點只需要將原有的尾節點nextWaiter指向它,並且更新尾節點即可。上述節點引用更新的過程並沒有使用CAS保證,原因在於呼叫await()方法的執行緒必定是獲取了鎖的執行緒,也就是說該過程是由鎖來保證執行緒安全的。在Object的監視器模型上,一個物件擁有一個同步佇列和等待佇列,而併發包中的Lock(更確切地說是同步器)擁有一個同步佇列和多個等待佇列,其對應關係如下圖所示

Java多執行緒Condition介面原理介紹

等待

呼叫Condition的await()方法(或者以await開頭的方法),會使當前執行緒進入等待佇列並釋放鎖,同時執行緒狀態變為等待狀態。當從await()方法返回時,當前執行緒一定獲取了Condition相關聯的鎖。

如果從佇列(同步佇列和等待佇列)的角度看await()方法,當呼叫await()方法時,相當於同步佇列的首節點(獲取了鎖的節點)移動到Condition的等待佇列中

public final void await() throws InterruptedException {
  if (Thread.interrupted())
   throw new InterruptedException();
  // 當前執行緒加入等待佇列
  Node node = addConditionWaiter();
  // 釋放同步狀態,也就是釋放鎖
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  while (!isOnSyncQueue(node)) {
   LockSupport.park(this);
   if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;
  }
  if (acquireQueued(node,savedState) && interruptMode != THROW_IE)
   interruptMode = REINTERRUPT;
  if (node.nextWaiter != null)
   unlinkCancelledWaiters();
  if (interruptMode != 0)
   reportInterruptAfterWait(interruptMode);
 }

呼叫該方法的執行緒成功獲取了鎖的執行緒,也就是同步佇列中的首節點,該方法會將當前執行緒構造成節點並加入等待佇列中,然後釋放同步狀態,喚醒同步佇列中的後繼節點,然後當前執行緒會進入等待狀態。當等待佇列中的節點被喚醒,則喚醒節點的執行緒開始嘗試獲取同步狀態。如果不是通過其他執行緒呼叫Condition.signal()方法喚醒,而是對等待執行緒進行中斷,則會丟擲InterruptedException

通知

呼叫Condition的signal()方法,將會喚醒在等待佇列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步佇列中

Java多執行緒Condition介面原理介紹

public final void signal() {
  if (!isHeldExclusively())
   throw new IllegalMonitorStateException();
  Node first = firstWaiter;
  if (first != null)
   doSignal(first);
 }

呼叫該方法的前置條件是當前執行緒必須獲取了鎖,可以看到signal()方法進行了isHeldExclusively()檢查,也就是當前執行緒必須是獲取了鎖的執行緒。接著獲取等待佇列的首節點,將其移動到同步佇列並使用LockSupport喚醒節點中的執行緒

節點從等待佇列移動到同步佇列的過程如下圖所示

Java多執行緒Condition介面原理介紹

通過呼叫同步器的enq(Node node)方法,等待佇列中的頭節點執行緒安全地移動到同步佇列。當節點移動到同步佇列後,當前執行緒再使用LockSupport喚醒該節點的執行緒。

被喚醒後的執行緒,將從await()方法中的while迴圈中退出(isOnSyncQueue(Node node)方法返回true,節點已經在同步佇列中),進而呼叫同步器的acquireQueued()方法加入到獲取同步狀態的競爭中。

成功獲取同步狀態(或者說鎖)之後,被喚醒的執行緒將從先前呼叫的await()方法返回,此時該執行緒已經成功地獲取了鎖。

Condition的signalAll()方法,相當於對等待佇列中的每個節點均執行一次signal()方法,效果就是將等待佇列中所有節點全部移動到同步佇列中,並喚醒每個節點的執行緒。

以上這篇Java多執行緒Condition介面原理介紹就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。