1. 程式人生 > >淺談Java中的Condition條件佇列,手摸手帶你實現一個阻塞佇列!

淺談Java中的Condition條件佇列,手摸手帶你實現一個阻塞佇列!

條件佇列是什麼?可能很多人和我一樣答不出來,不過今天終於搞清楚了!

什麼是條件佇列

條件佇列:當某個執行緒呼叫了wait方法,或者通過Condition物件呼叫了await相關方法,執行緒就會進入阻塞狀態,並加入到對應條件佇列中。

在等待喚醒機制相關文章中我們提到了條件佇列,即當物件獲取到同步鎖之後,如果呼叫了wait方法,當前執行緒會進入到條件佇列中,並釋放鎖。

synchronized(物件){ // 獲取鎖失敗,執行緒會加入到同步佇列中 
    while(條件不滿足){
        物件.wait();// 呼叫wait方法當前執行緒加入到條件佇列中
    }
}

基於synchcronized的內建條件佇列存在一些缺陷。每個內建鎖都只能有一個相關聯的條件佇列,因而存在多個執行緒可能在同一個條件佇列上等待不同的條件謂詞,並且在最常見的加鎖模式下公開條件佇列物件。

Java中的鎖的實現可以分為兩種,一種是基於synchronized的隱式鎖,它是基於JVM層面實現的;而另一種則是基於AQS框架在程式碼層面實現的鎖,如ReentrantLock等,在進行併發控制過程中,很多情況下他們都可以相互替代。

其中同步佇列和條件佇列是AQS中兩個比較核心的概念,它們是程式碼層面實現鎖的關鍵。關於同步佇列的內容,我們已經在圖解AQS的設計與實現,手摸手帶你實現一把互斥鎖!中進行了詳細的介紹。

與Object配合synchronized相比,基於AQS的Lock&Condition實現的等待喚醒模式更加靈活,支援多個條件佇列,支援等待狀態中不響應中斷以及超時等待功能; 其次就是基於AQS實現的條件佇列是"肉眼可見"的,我們可以通過原始碼進行debug,而synchronized則是完全隱式的。

同步佇列和條件佇列

與條件佇列密不可分的類則是ConditionObject, 是AQS中實現了Condition介面的內部類,通常配合基於AQS實現的鎖一同使用。當執行緒獲取到鎖之後,可以呼叫await方法進入條件佇列並釋放鎖,或者呼叫singinal方法喚醒對應條件佇列中等待時間最久的執行緒並加入到等待佇列中。

在AQS中,執行緒會被封裝成Node物件加入佇列中,而條件佇列中則複用了同步佇列中的Node物件。

Condition相關方法和描述

Condition介面一共定義了以下幾個方法:

await(): 當前執行緒進入等待狀態,直到被通知(siginal)或中斷【和wait方法語義相同】。

awaitUninterruptibly(): 當前執行緒進入等待狀態,直到被通知,對中斷不敏感。

awaitNanos(long timeout): 當前執行緒進入等待狀態直到被通知(siginal),中斷或超時。

awaitUnitil(Date deadTime): 當前執行緒進入等待狀態直到被通知(siginal),中斷或到達某個時間。

signal(): 喚醒一個等待在Condition上的執行緒,該執行緒從等待方法返回前必須獲得與Condition關聯的鎖【和notify方法語義相同】

signalAll(): 喚醒所有等待在Condition上的執行緒,能夠從等待方法返回的執行緒必須獲得與Condition關聯的鎖【和notifyAll方法語義相同】。

條件佇列入隊操作

當執行緒獲取到鎖之後,Condition物件呼叫await相關的方法,執行緒會進入到對應的條件佇列中。

/**
  * 如果當前執行緒被終端,丟擲 InterruptedException 異常
  */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加當前執行緒到【條件佇列】
    Node node = addConditionWaiter();
    // 釋放已經獲取的鎖資源,並返回釋放前的同步狀態
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果當前節點不在【同步佇列】中, 執行緒進入阻塞狀態,等待被喚醒
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

條件隊出隊操作

Condition物件呼叫signal或者signalAll方法時,

/**
 * 將【條件佇列】中第一個有效的元素移除並且新增到【同步佇列】中
 * 所謂有效指的是非null,並且狀態嗎
 * @param first 條件佇列中第一個非空的元素
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    // 將條件佇列中等待最久的那個有效元素新增到同步佇列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

/**
 * 將條件佇列中的節點轉換到同步佇列中
 */
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     * 如果節點的等待狀態不能被修改,說明當前執行緒已經被取消等待【多個執行緒執行siginal時會出現的情況】
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * 加入到【同步佇列】中,並且嘗試將前驅節點設定為可喚醒狀態
     */
    Node p = enq(node); // 將node新增到同步佇列中,並返回它的前驅節點
    int ws = p.waitStatus;
    // 如果前驅節點不需要喚醒,或者設定狀態為‘喚醒’失敗,則喚醒執行緒時期重新爭奪同步狀態
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

實現阻塞佇列

  1. 自定義互斥鎖升級,增加獲取Condition物件介面。
/**
 * 自定義互斥鎖
 *
 * @author cruder
 * @time 2019/11/29 9:43
 */
public class MutexLock {

    private static final Sync STATE_HOLDER = new Sync();

    /**
     * 通過Sync內部類來持有同步狀態, 當狀態為1表示鎖被持有,0表示鎖處於空閒狀態
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 是否被獨佔, 有兩種表示方式
         *  1. 可以根據狀態,state=1表示鎖被佔用,0表示空閒
         *  2. 可以根據當前獨佔鎖的執行緒來判斷,即getExclusiveOwnerThread()!=null 表示被獨佔
         */
        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() != null;
        }

        /**
         * 嘗試獲取鎖,將狀態從0修改為1,操作成功則將當前執行緒設定為當前獨佔鎖的執行緒
         */
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        /**
         * 釋放鎖,將狀態修改為0
         */
        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) {
                throw new UnsupportedOperationException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        //【新增程式碼】
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }

    /**
     * 下面的實現Lock介面需要重寫的方法,基本是就是呼叫內部內Sync的方法
     */
    public void lock() {
        STATE_HOLDER.acquire(1);
    }

    public void unlock() {
        STATE_HOLDER.release(1);
    }

    // 【新增程式碼】 獲取條件佇列
    public Condition newCondition(){
        return STATE_HOLDER.newCondition();
    }

}
  1. 基於自定義互斥鎖,實現阻塞佇列。阻塞佇列具有兩個特點:
  • 新增元素到佇列中, 如果佇列已滿會使得當前執行緒阻塞【加入到條件佇列-佇列不滿】,直到佇列不滿為止
  • 移除佇列中的元素,當佇列為空時會使當前執行緒阻塞【加入到條件佇列-佇列不空】,直到佇列不為空為止
/**
 *  有界阻塞阻塞佇列
 *
 * @author Jann Lee
 * @date 2019-12-11 22:20
 **/
public class BoundedBlockingQueue<T> {
    /**
     * list作為底層儲存結構
     */
    private List<T> dataList;
    /**
     * 佇列的大小
     */
    private int size;

    /**
     * 鎖,和條件變數
     */
    private MutexLock lock;
    /**
     * 佇列非空 條件變數
     */
    private Condition notEmpty;
    /**
     * 佇列未滿 條件變數
     */
    private Condition notFull;

    public BoundedBlockingQueue(int size) {
        dataList = new ArrayList<>();
        lock = new MutexLock();
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
        this.size = size;
    }


    /**
     * 佇列中新增元素 [只有佇列未滿時才可以新增,否則需要等待佇列變成未滿狀態]
     */
    public void add(T data) throws InterruptedException {
        lock.lock();
        try {
            // 如果佇列已經滿了, 需要在等待“佇列未滿”條件滿足
            while (dataList.size() == size) {
                notFull.await();
            }
            dataList.add(data);
            Thread.sleep(2000);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 移除佇列的第一個元素[只有佇列非空才可以移除,否則需要等待變成佇列非空狀態]
     */
    public T remove() throws InterruptedException {
        lock.lock();
        try {
            // 如果為空, 需要在等待“佇列非空”條件滿足
            while (dataList.isEmpty()) {
                notEmpty.await();
            }
            T result = dataList.remove(0);
            notFull.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }

}

總結

  1. 條件佇列和同步佇列在Java中有兩種實現,synchronized關鍵字以及基於AQS
  2. 每個(基於synchronized的)內建鎖都只能有一個相關聯的條件佇列,會存在多個執行緒可能在同一個條件佇列上等待不同的條件謂詞;而(基於AQS實現的)顯式鎖支援多個條件佇列
  3. 與wait,notify,notifyAll 對應的方法時Conditoin介面中的await,signal,signalAll,他們具有相同的語義

最後敬上一個關注了也沒有錯的公眾號~

相關推薦

JavaCondition條件佇列實現一個阻塞佇列

條件佇列是什麼?可能很多人和我一樣答不出來,不過今天終於搞清楚了! 什麼是條件佇列 條件佇列:當某個執行緒呼叫了wait方法,或者通過Condition物件呼叫了await相關方法,執行緒就會進入阻塞狀態,並加入到對應條件佇列中。 在等待喚醒機制相關文章中我們提到了條件佇列,即當物件獲取到同步鎖之後,如果呼叫

最近在研究多線程JAVA多線程的幾種實現方式

進行 數據 使用 導致 效率問題 多線程 方法 sta img 多線程的實現方式:   個人認為,要說多線程的實現方式,萬變不離其宗,最基本的就是兩種1.繼承Thread類;2.實現runnable接口,本質上來說就是用來啟動線程執行任務的過程,具體來說的話,通過這

Java的hashCode方法

implement state ask get() 存在 rsa key 沖突 如何 哈希表這個數據結構想必大多數人都不陌生,而且在很多地方都會利用到hash表來提高查找效率。在Java的Object類中有一個方法: public native int hashCode(

java內置的觀察者模式與動態代理的實現

所有 代理 notify play ani effect 一個 indicate protected 一.關於觀察者模式 1.將觀察者與被觀察者分離開來,當被觀察者發生變化時,將通知所有觀察者,觀察者會根據這些變化做出對應的處理。 2.jdk裏已經提供對應的Observer

Java的深拷貝和拷貝

detail tle pac err @override 復制對象 deep har 間接   淺談Java中的深拷貝和淺拷貝(轉載) 原文鏈接: http://blog.csdn.net/tounaobun/article/details/8491392 假如說你想復制一

JAVA“增強”類的某個方法的幾個方法

exc 目標 byte 相同 nbsp 優點 method value oca 一、繼承 使用場景:能夠控制這個類的構造的時候,才可以使用繼承。  優點:簡單容易使用, 缺點:耦合性大大的增強,不利於後期的維護,所以對於繼承這種方法,謹慎使用。 代碼實現:二、裝飾者模式 

JAVA字符串常量的儲存位置

數據 每一個 [] jit 返回 inf post 符號 boolean 在講述這些之前我們需要一些預備知識: Java的內存結構我們可以通過兩個方面去看待它。 一、從抽象的JVM的角度去看。相關定義請參考JVM規範:Chapter 2. The Structure o

java的"=="和eqals區別

short copy 覆寫 main 否則 變量 程序 bar gif 在初學Java時,可能會經常碰到下面的代碼: 1 String str1 = new String("hello"); 2 String str2 = new String("hello");

Java的物件和物件的引用

淺談java中的物件和引用 文章轉載出處:https://www.cnblogs.com/dolphin0520/p/3592498.html 在Java中,有一組名詞經常一起出現,它們就是“物件和物件引用”,很多朋友在初學Java的時候可能經常會混淆這2個概念,覺得它們是一回事,事

Java的this用法

基本用法 1.  this.變數名代表當前物件的成員變數。this.方法名代表當前物件的成員方法。this代表當前物件。 2. 當在內部類或匿名類中時,this代表其所在的內部類或匿名類,如果要用外部類的方法和變數,則加上外部類的類名。例如: public class He

java的反射機制

什麼是反射機制? Java反射機制是在執行過程中藉助Reflection API,對於任意一個類,都能夠知道這個類的所有屬性和方法;對於任何一個物件,都能夠呼叫它的任意一個方法,這種動態獲取的資訊以及動態呼叫物件的方法的功能成為java語言的反射機制。   java反射機制提供

Java強軟弱虛引用

什麼是強軟弱虛引用?為什麼要搞出來這四種引用?什麼情況下可以用到這四種引用呢?本文主要探討這三個問題! 1基本概念 強引用:在Java程式中通過呼叫構造器方法或者反射的方式創建出來的物件後通過一個引用指向它,並且在程式執行的過程中,可以通過引用鏈可以獲取到這個物件,那麼就可以說此處有一個強引用!

java的比較機制

對於程式語言來說,比較機制是核心機制之一,因為平時內建的比較機制,導致我對這個知識點沒有啥概念,總覺得他們本來就可以比較,最近在看Java容器的時候,發現原來Java做的封裝太好了,導致我們平時一般是不會接觸到這點。廢話不多說,來看下Java中的比較機制。 1:comparable介面

java的對象、類、與方法的重載

ring bat spring 初始 [] myba strong 有關 .html 對象: 一切皆為對象。 對象包括兩部分內容:屬性(名詞形容詞),行為(動詞)。 對象和對象之間是有關系的: 派生,關聯,依賴。 類: 對同一類別的眾多對象的一種抽象。 類,還是用來生成對象

Java併發(十九):final實現原理 Java的final關鍵字

final在Java中是一個保留的關鍵字,可以宣告成員變數、方法、類以及本地變數。 一旦你將引用宣告作final,你將不能改變這個引用了,編譯器會檢查程式碼,如果你試圖將變數再次初始化的話,編譯器會報編譯錯誤。 一、final變數   final成員變量表示常量,只能被賦值一次,賦值後值不再改變(fin

java遍歷Map的幾種方法

java中的map遍歷有多種方法,從最早的Iterator,到java5支援的foreach,再到java8 Lambda,讓我們一起來看下具體的用法以及各自的優缺點 先初始化一個map public class TestMap { public static Map<Intege

java筆記】Java的equals和==

在初學Java時,可能會經常碰到下面的程式碼: String str1 = new String("hello"); String str2 = new String("hello"); System.out.println(str1==str2); System.out.

java的i=i++

淺談java中的 “i=i++;” 首先有一下程式碼: public class Demo{ public static void main(String []args){ int i=12;

Java的物件和引用

Java物件及其引用  原文:http://zwmf.iteye.com/blog/1738574 關於物件與引用之間的一些基本概念。         初學Java時,在很長一段時間裡,總覺得基本概念很模糊。後來才知道,在許多Java書中,把物件和物件的引用混為一談。可是,

java"&&"和"&"的區別

“&&”和”&”都是java中的邏輯運算子,並且它們都表示“邏輯與”即“同真則真,有一假則假”,它們的區別在於”&&”具有短路功能,即如果左邊是false,則右邊的邏輯表示式不會執行。而”&”沒有短路功能,無論左邊是false還是true右邊都會執行。