1. 程式人生 > >AQS(抽象佇列同步器)

AQS(抽象佇列同步器)

一、什麼是 AQS ?

AQS即AbstractQueuedSynchronizer的縮寫,是併發程式設計中實現同步器的一個框架。
AQS基於一個FIFO雙向佇列實現,被設計給那些依賴一個代表狀態的原子int值的同步器使用。我們都知道,既然叫同步器,那個肯定有個代表同步狀態(臨界資源)的東西,在AQS中即為一個叫state的int值,該值通過CAS進行原子修改。
在AQS中存在一個FIFO佇列,佇列中的節點表示被阻塞的執行緒,佇列節點元素有4中型別,每種型別表示執行緒被阻塞的原因,這四種類型分別是:

  • CANCELLED : 表示該執行緒是因為超時或者中斷原因而被放到佇列中
  • CONDITION : 表示該執行緒是因為某個條件不滿足而被放到佇列中,需要等待一個條件,直到條件成立後才會出隊
  • SIGNAL : 表示該執行緒需要被喚醒
  • PROPAGATE : 表示在共享模式下,當前節點執行釋放release操作後,當前結點需要傳播通知給後面所有節點

由於一個共享資源同一時間只能由一條執行緒持有,也可以被多個執行緒持有,因此AQS中存在兩種模式,如下:

  • 1、獨佔模式
    獨佔模式表示共享狀態值state每次只能由一條執行緒持有,其他執行緒如果需要獲取,則需要阻塞,如JUC中的ReentrantLock
  • 2、共享模式
    共享模式表示共享狀態值state每次可以由多個執行緒持有,如JUC中的CountDownLatch

二、Node原始碼

既然AQS是基於一個FIFO佇列的框架,那麼我們先來看下佇列的元素節點Node的資料結構,原始碼如下:

static final class Node {
    /**共享模式*/
    static final Node SHARED = new Node();
    /**獨佔模式*/
    static final Node EXCLUSIVE = null;

    /**標記執行緒由於中斷或超時,需要被取消,即踢出佇列*/
    static final int CANCELLED =  1;
    /**執行緒需要被喚醒*/
    static
final int SIGNAL = -1; /**執行緒正在等待一個條件*/ static final int CONDITION = -2; /** * 傳播 */ static final int PROPAGATE = -3; // waitStatus只取上面CANCELLED、SIGNAL、CONDITION、PROPAGATE四種取值之一 volatile int waitStatus; // 表示前驅節點 volatile Node prev; // 表示後繼節點 volatile Node next; // 佇列元素需要關聯一個執行緒物件 volatile Thread thread; // 表示下一個waitStatus值為CONDITION的節點 Node nextWaiter; /** * 是否當前結點是處於共享模式 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 返回前一個節點,如果沒有前一個節點,則丟擲空指標異常 */ final Node predecessor() throws NullPointerException { // 獲取前一個節點的指標 Node p = prev; // 如果前一個節點不存在 if (p == null) throw new NullPointerException(); else // 否則返回 return p; } // 初始化頭節點使用 Node() {} /** * 當有執行緒需要入隊時,那麼就建立一個新節點,然後關聯該執行緒物件,由addWaiter()方法呼叫 */ Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } /** * 一個執行緒需要等待一個條件阻塞了,那麼就建立一個新節點,關聯執行緒物件 */ Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }

三、AQS中的共享狀態值

 /**
 * 同步狀態值
 * 使用volatile修飾,保證多執行緒修改的可見性
 */
private volatile int state;

/**
 * 獲取同步狀態值
 * 使用final修飾,子類不能覆蓋,只能呼叫
 */
protected final int getState() {
    return state;
}

/**
 * 修改同步狀態值
 */
protected final void setState(int newState) {
    state = newState;
}
/**
 * CAS修改state值
 */
protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

四、AQS中的tryAcquire、tryRelease方法:

AQS類中,tryAcquire()、tryAcquireShared()、tryRelease()和tryReleaseShared()等方法用於獲取和釋放state,但AQS類這幾個方法都是直接丟擲了UnsupportedOperationException異常,這裡其實是使用了模板模式,子類只需重寫這是個方法即可(其實是其中2個,分別對應獨佔模式和共享模式)。

/**
 * 嘗試以獨佔模式獲取。 該方法應該查詢物件的狀態是否允許以獨佔模式獲取,如果是,則獲取它。 
 * 該方法總是由執行獲取的執行緒呼叫。 如果此方法報告失敗,則獲取方法可能將執行緒排隊(如果尚未排隊),直到被其他執行緒釋放為止。 這可以用於實現方法Lock.tryLock() 。 
 */
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
/**
 * 嘗試以共享模式獲取。 該方法應該查詢物件的狀態是否允許在共享模式下獲取該物件,如果是這樣,就可以獲取它。 
 * 該方法總是由執行獲取的執行緒呼叫。 如果此方法報告失敗,則獲取方法可能將執行緒排隊(如果尚未排隊),直到被其他執行緒釋放為止。
 * @return int 失敗的負值 如果在共享模式下獲取成功但沒有後續共享模式獲取可以成功,則為零; 
 *(支援三種不同的返回值使得這種方法可以在僅獲取有時只能完全執行的上下文中使用。)成功後,該物件已被獲取
 */
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
/**
 * 如果同步僅針對當前(呼叫)執行緒進行儲存,則返回true 。 每次呼叫不等待AbstractQueuedSynchronizer.ConditionObject方法時都會呼叫此方法。 (等待方法呼叫release(int)。 )
 * @return true如果同步是唯一的; 否則false 
 */
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

五、FIFO佇列維護,入隊(enqueue)和出隊(dequeue)原始碼:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    //無參構造器,供子類初始化呼叫
    protected AbstractQueuedSynchronizer() {}

    static final class Node {//... 省略... }

    private transient volatile Node head;
    private transient volatile Node tail;

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     * 設定head就是dequeue方法,並且把相應的應用刪除,方便GC時回收
     * @param node the node
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    /**
     * Wakes up node's successor, if one exists.
     * 通知節點的繼承者
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);//呼叫的都是Unsafe的native方法,直接通過屬性偏移量修改記憶體中的值

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //如果為空或狀態是CANCELLED,這從尾部開始向前找
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//喚醒這個執行緒
    }
    //... 省略 ...
}

六、基於AQS實現自定義同步類

Mutex是JDK文件的一個例項:

/**
 * 互斥鎖,實現LOCK介面,供外界呼叫
 */
class Mutex implements Lock, java.io.Serializable {
    // 自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判斷是否鎖定狀態
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 嘗試獲取資源,立即返回。成功則返回true,否則false。
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 這裡限定只能為1個量
            if (compareAndSetState(0, 1)) {//state為0才設定為1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//設定為當前執行緒獨佔資源
                return true;
            }
            return false;
        }

        // 嘗試釋放資源,立即返回。成功則為true,否則false。
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 限定為1個量
            if (getState() == 0)//既然來釋放,那肯定就是已佔有狀態了。只是為了保險,多層判斷!
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);//釋放資源,放棄佔有狀態
            return true;
        }
    }

    // 真正同步類的實現都依賴繼承於AQS的自定義同步器!
    private final Sync sync = new Sync();

    //lock<-->acquire。兩者語義一樣:獲取資源,即便等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    //tryLock<-->tryAcquire。兩者語義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    //unlock<-->release。兩者語義一樣:釋放資源。
    public void unlock() {
        sync.release(1);
    }

    //鎖是否佔有狀態
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

除了Mutex,還有以下幾個同步類是通過AQS的同步器進行同步管理的,不同的地方在於tryAcquire-tryRelease的實現方式不一樣

  • ReentrantLock:可重入鎖,與mutex一樣實現了Lock介面,使用獨佔模式的同步器。
  • CountDownLatch:計數器,使用了共享模式的同步器進行多執行緒執行控制。
  • Semphore:一個計數訊號量,維持一個許可證池(只計數)每次執行前獲取許可證,執行完釋放許可證。類似限流演算法中的令牌桶演算法

相關推薦

AQS抽象佇列同步

一、什麼是 AQS ? AQS即AbstractQueuedSynchronizer的縮寫,是併發程式設計中實現同步器的一個框架。 AQS基於一個FIFO雙向佇列實現,被設計給那些依賴一個代表狀態的原子int值的同步器使用。我們都知道,既然叫同步器

JAVA AQS抽象佇列同步詳解

之前看到一篇很好的介紹AQS抽象佇列同步器的文章,分享下。 框架 維護了一個state(代表共享資源,注意是volatile修飾的保證可見性)和一個FIFO執行緒等待佇列(多執行緒爭用資源被阻塞時會進入此佇列)state的訪問方式有三種: getSt

[Java併發] AQS抽象佇列同步原始碼解析--鎖獲取過程

要深入瞭解java併發知識,AbstractQueuedSynchronizer(AQS)是必須要拿出來深入學習的,AQS可以說是貫穿了整個JUC併發包,例如ReentrantLock,CountDownLatch,CyclicBarrier等併發類都涉及到了AQS。接下來就對AQS的實現原理進行分析。 在開

[Java併發] AQS抽象佇列同步原始碼解析--獨佔鎖釋放過程

[Java併發] AQS抽象佇列同步器原始碼解析--獨佔鎖獲取過程 上一篇已經講解了AQS獨佔鎖的獲取過程,接下來就是對AQS獨佔鎖的釋放過程進行詳細的分析說明,廢話不多說,直接進入正文... 鎖釋放入口release(int arg) 首先進行說明下,能夠正常執行到release方法這裡來的執行緒都是獲取到

java 佇列同步AbstractQueuedSynchronizerAQS實現分析

AQS  內部使用一個int變數state表示同步狀態。 內部使用一個隱式的FIFO佇列(並沒有宣告這樣一個佇列,只是通過每個節點記錄它的上下節點來從邏輯上產生一個佇列)來完成阻塞執行緒的排隊。 這個FIFO佇列在 AQS 中被定義為一個內部類Node:

java 佇列同步AbstractQueuedSynchronizerAQS

  佇列同步器AbstractQueuedSynchronizer(AQS),是用來構建鎖或者其他同步元件的基礎框架如對獲取鎖失敗執行緒的阻塞、喚醒,都是AQS替我們實現,ReentrantLock,ReentrantReadWriteLock和CountDownLatch等都是用AQ

AbstractQueuedSynchronizer 佇列同步AQS

AbstractQueuedSynchronizer 佇列同步器(AQS) 佇列同步器 (AQS), 是用來構建鎖或其他同步元件的基礎框架,它通過使用 int 變量表示同步狀態,通過內建的 FIFO 的佇列完成資源獲取的排隊工作。(摘自《Java併發程式設計的藝術》) 我們知道獲取同步狀態有獨佔

Java併發程式設計佇列同步AQS

一、AQS簡介 佇列同步器AbstractQueuedSynchronizer(簡稱AQS)是用來構建鎖或其他同步元件的基礎框架,它服務的是鎖的實現者。AQS有一個變量表示同步狀態,通過內建的FIFO管理執行緒排隊,基於AQS可以將同步狀態管理、執行緒排隊、等待與喚醒等操作對鎖遮蔽,簡化鎖的實現

Java多執行緒程式設計-12-Java中的佇列同步AQS和ReentrantLock鎖原理簡要分析

原文出自 : https://blog.csdn.net/xlgen157387/article/details/78341626 一、Lock介面 在上一篇文章中: Java多執行緒程式設計-(5)-使用Lock物件實現同步以及執行緒間通訊 介紹

java併發程式設計--java中的鎖Lock介面和佇列同步AQS

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //內部類--節點 static final clas

Java 中佇列同步 AQSAbstractQueuedSynchronizer實現原理

#### 前言 在 Java 中通過鎖來控制多個執行緒對共享資源的訪問,使用 Java 程式語言開發的朋友都知道,可以通過 synchronized 關鍵字來實現鎖的功能,它可以隱式的獲取鎖,也就是說我們使用該關鍵字並不需要去關心鎖的獲取和釋放過程,但是在提供方便的同時也意味著其靈活性的下降。例如,有這樣的一

佇列同步 AQS

佇列同步器AbstractQueuedSynchronizer,是用來構建鎖或者其他同步元件的基礎框架,首先要知道這個共享變數的狀態(是否已經被其他執行緒鎖住等),這設定了一個int成員變量表示同步狀態,通過內建的先進先出佇列來完成資源獲取執行緒的排隊工作。一些執行緒無法獲取到共享資源等

【搞定Java併發程式設計】第17篇:佇列同步AQS原始碼分析之共享模式

AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 通過上一篇文章的的分析,我們知道獨佔模式獲取同步狀態(或者說獲取鎖

【搞定Java併發程式設計】第16篇:佇列同步AQS原始碼分析之獨佔模式

AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 本文主要講解佇列同步器AQS的獨佔模式:主要分為獨佔式同步狀態獲取

【搞定Java併發程式設計】第15篇:佇列同步AQS原始碼分析之概要分析

AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 先推薦兩篇不錯的博文: 1、一行一行原始碼分析清楚Abstract

【搞定Java併發程式設計】第18篇:佇列同步AQS原始碼分析之Condition介面、等待佇列

AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 通過前面三篇關於AQS文章的學習,我們深入瞭解了AbstractQ

C/C++ 在網路下的程式設計的應用時間同步

https://blog.csdn.net/worldmakewayfordream/article/details/24187833   寫一個基於UDP的時間伺服器。    時間伺服器提供的功能就是: 當客戶端傳送請求時,發回當前的系統時間。

Spring技術內幕——事務的建立,掛起,迴歸,提交事務攔截抽象事務管理

在涉及單個數據庫區域性事務的事務處理中,事務的最終實現和資料庫的支援是緊密相關的。對區域性資料庫事務來說,一個事務處理的操作單元往往對應著一系列的資料庫操作。 Spring事務處理主要分以下三個主要的過程: (1)讀取和處理在Spring IoC容器中配置的事務處理屬性,並

Java 佇列同步 AQS

> 本文部分摘自《Java 併發程式設計的藝術》 ## 概述 佇列同步器 AbstractQueuedSynchronize(以下簡稱同步器),是用來構建鎖(Lock)或者其他同步元件(JUC 併發包)的基礎框架,它使用了一個 int 成員變量表示同步狀態,通過內建的 FIFO 佇列來完成資源獲取

Android開發之AudioManager音頻管理具體解釋

應該 數量 service eth out 開發 要求 type 路由 AudioManager簡單介紹: AudioManager類提供了訪問音量和振鈴器mode控制。使用Context.getSystemService(Context.AUDIO_SERVICE)