1. 程式人生 > 實用技巧 >07-多執行緒筆記-2-鎖-3-Lock-1-AQS

07-多執行緒筆記-2-鎖-3-Lock-1-AQS

AbstractQueuedSynchronizer簡稱AQS,它是java.util.concurrent包下CountDownLatch/FutureTask/ReentrantLock/RenntrantReadWriteLock/Semaphore實現的基礎。

AQS通過內部實現的FIFO等待佇列來完成資源獲取執行緒的等待工作,如果當前執行緒獲取資源失敗,AQS則會將當前執行緒以及等待狀態等資訊構造成一個Node結構的節點,並將其加入等待佇列中,同時會阻塞當前執行緒;當其它獲取到資源的執行緒釋放持有的資源時,則會把等待佇列節點中的執行緒喚醒,使其再次嘗試獲取對應資源。

AQS是一個抽象類,當我們繼承AQS去實現自己的同步器時,要做的僅僅是根據自己同步器需要滿足的性質實現執行緒獲取和釋放資源的方式(修改同步狀態變數的方式)即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊、喚醒出隊、以及執行緒在佇列中行為的管理等),AQS在其頂層已經幫我們實現好了,AQS的這種設計使用的正是模板方法模式。

參考JDK11

AbstractOwnableSynchronizer

執行緒專有同步器,此類為建立鎖和可能需要所有權概念的相關同步器提供了基礎。這個類是AbstractQueuedSynchronizer的父類;

此類只有一個變數

/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;

和對應的get/set函式;在java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease

java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire等函式中用於制定擁有鎖的執行緒;

例項屬性

AQS使用一個int成員變數state去表徵當前資源的同步狀態。AQS使用CAS對該同步狀態進行原子操作實現對其值的修改。

private volatile int state;

AQS持有head指標和tail指標,頭結點是搶佔鎖成功而持有鎖的執行緒對應的結點,若有執行緒搶鎖失敗,AQS會建立新結點並用CAS操作使其成為新的尾結點

/**
* Head of the wait queue, lazily initialized.Except for
* initialization, it is modified only via method setHead.Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized.Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;

Node

AQS通過維護一個等待獲取鎖的執行緒佇列來管理(獲取資源失敗入隊/喚醒出隊)搶佔鎖的執行緒,這個佇列是一種 CLH介紹的變體。

AQS把對某執行緒的一些控制資訊放到了其前驅中維護,當某結點的前驅釋放鎖或被取消時會喚醒其後繼,而其後繼會在獲取鎖成功後將自己設為新的頭結點,AQS對這個維護等待執行緒佇列的操作都是非阻塞的,也是執行緒安全的。佇列中的每個結點都是類Node的一個例項。

共享

AQS支援執行緒搶佔兩種鎖——獨佔鎖和共享鎖:

  • 獨佔鎖:同一個時刻只能被一個執行緒佔有,如ReentrantLock,ReentrantWriteLock等;
  • 共享鎖:同一時間點可以被多個執行緒同時佔有,如ReentrantReadLock,Semaphore等;

在Node類中,有兩個屬性用於標識當前執行緒請求的是獨佔執行緒還是共享執行緒;

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

等待狀態

Node節點中,維護waitStatus屬性標識執行緒的等待狀態; waitStatus初始預設為0,Condition佇列中初始預設為-2;它有以下幾種取值:

//結點已被取消,表示執行緒放棄搶鎖,結點狀態以後不再變直到GC回收它
static final int CANCELLED = 1;
//結點的後繼已經或很快就阻塞,在結點釋放鎖或被取消時要喚醒其後面第1個非CANCELLED結點
static final int SIGNAL = -1;

/** Condition佇列中結點的狀態,CLH佇列中結點沒有該狀態,當Condition的signal方法被呼叫,
Condition佇列中的結點被轉移進CLH佇列並且狀態變為0 **/
static final int CONDITION = -2;
//與共享模式相關,當執行緒以共享模式去獲取或釋放鎖時,對後續執行緒的釋放動作需要不斷往後傳播
static final int PROGAGATE = -3;

0:新結點入隊時的預設狀態。

負值表示結點處於有效等待狀態,而正值表示結點已被取消。所以原始碼中很多地方用>0、<0來判斷結點的狀態是否正常。

模板方法

AQS提供了針對鎖操作的一些目標方法,這些方法需要在子類中實現,像tryReleaseShared/tryAcquireShared,tryAcquire/tryRelease

獨佔鎖

獲取

如果成功獲取鎖,則返回;如果獲取鎖失敗,則將將當前執行緒加入到等待佇列中,並在佇列中執行自旋獲取鎖,直到成功獲取鎖或者執行緒滿足阻塞條件而阻塞;當前執行緒只有被前一個有效等待節點記錄後才能進入阻塞狀態。前一個有效節點釋放鎖後,會通知處於阻塞狀態的當前執行緒,然後當前執行緒重新自旋獲取鎖;
如果在獲取鎖的過程中出現異常,會將當前節點從等待佇列中刪除。

acquire(int arg)方法是獨佔模式下執行緒獲取共享資源的頂層入口。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

函式流程如下:
(1)tryAcquire()嘗試直接去獲取資源,如果成功則直接返回(這裡體現了非公平鎖,每個執行緒獲取鎖時會嘗試直接搶佔加塞一次,而CLH佇列中可能還有別的執行緒在等待);
(2)addWaiter()將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
(3)acquireQueued()使執行緒阻塞在等待佇列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
(4)如果執行緒在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

加入CLH佇列的過程不再描述,主要對acquireQueued()進行說明

  • acquireQueued(Node, int)

    /**
    * Acquires in exclusive uninterruptible mode for thread already in
    * queue. Used by condition wait methods as well as acquire.
    *
    * @param node the node
    * @param arg the acquire argument
    * @return {@code true} if interrupted while waiting
    */
    final boolean acquireQueued(final Node node, int arg) {
    	// 當前執行緒是否被中斷
    	boolean interrupted = false;
    	try {
    		// 自旋獲取資源
    		for (;;) {
    			// 獲取當前執行緒對應節點的前驅結點
    			final Node p = node.predecessor();
    			// 如果前驅結點是頭節點,當前執行緒有獲取資源的資格,然後獲取資源
    			if (p == head && tryAcquire(arg)) {
    				// 如果獲取資源成功,修改佇列頭指標指向自己
    				setHead(node);
    				// 釋放當前節點的頭節點示例
    				p.next = null; // help GC
    				// 返回中斷狀態
    				return interrupted;
    			}
    			// 沒有獲取資源資格,或者獲取資源失敗,需要執行shouldParkAfterFailedAcquire方法,用於確定當前結點對應的執行緒是否可以進入阻塞狀態
    			if (shouldParkAfterFailedAcquire(p, node))
    				// 如果可以進入阻塞狀態,執行parkAndCheckInterrupt方法以阻塞當前執行緒
    				interrupted |= parkAndCheckInterrupt();
    		}
    	// 如果執行tryAcquire失敗,取消失敗節點
    	} catch (Throwable t) {
    		// 刪除當前節點
    		cancelAcquire(node);
    		if (interrupted)
    			selfInterrupt();
    		throw t;
    	}
    }
    
    

釋放

釋放鎖分兩步,第一步釋放資源,第二步喚醒當前節點下一個可用的節點對應的執行緒;

共享鎖

獲取

共享資源的獲取,獲取成功則直接返回,獲取失敗則進入等待佇列,直到獲取到資源為止,整個過程忽略中斷。

public final void acquireShared(int arg) {
	// 如果成功獲取共享鎖,直接返回;否則,進入等待佇列
	if (tryAcquireShared(arg) < 0)
		// 進入等待佇列,自旋獲取鎖或阻塞
		doAcquireShared(arg);
}

/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
	// 加入到等待佇列
	final Node node = addWaiter(Node.SHARED);
	// 是否中斷標識,此處也是在獲取鎖結束後響應中斷,只有在獲取中斷鎖(acquireSharedInterruptibly)時,才會在獲取鎖的過程中響應中斷
	boolean interrupted = false;
	try {
		for (;;) {
			// 獲取前繼節點
			final Node p = node.predecessor();
			// 如果前繼節點為頭節點(成功獲取了鎖的節點),則此階段有資格獲取鎖
			if (p == head) {
				//嘗試獲取鎖
				int r = tryAcquireShared(arg);
				// 獲取鎖成功
				if (r >= 0) {
					// 成功獲取鎖後,將自身設定為等待佇列的頭節點(之前的頭節點會出隊,被GC回收),並喚醒所有後續的共享鎖申請節點
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					return;
				}
			}
			// 如果沒有獲取鎖資格,判斷是否可以進入阻塞狀態;如果可以,執行緒進入阻塞狀態
			if (shouldParkAfterFailedAcquire(p, node))
				interrupted |= parkAndCheckInterrupt();
		}
	} catch (Throwable t) {
		// 如果請求共享鎖發生異常,將當前節點從等待佇列中刪除,取消申請鎖流程
		cancelAcquire(node);
		throw t;
	} finally {
		// 響應中斷
		if (interrupted)
			selfInterrupt();
	}
}

/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
	Node h = head; // Record old head for check below
	// 修改等待佇列的頭節點,之前的頭節點出隊,等待GC回收
	setHead(node);
	/*
	 * Try to signal next queued node if:
	 *Propagation was indicated by caller,
	 *or was recorded (as h.waitStatus either before
	 *or after setHead) by a previous operation
	 *(note: this uses sign-check of waitStatus because
	 *PROPAGATE status may transition to SIGNAL.)
	 * and
	 *The next node is waiting in shared mode,
	 *or we don't know, because it appears null
	 *
	 * The conservatism in both of these checks may cause
	 * unnecessary wake-ups, but only when there are multiple
	 * racing acquires/releases, so most need signals now or soon
	 * anyway.
	 */
	// 如果資源有剩餘量,並且頭節點等待狀態小於0(小於0表示頭節點後續有阻塞節點),則繼續處理後續節點
	if (propagate > 0 || h == null || h.waitStatus < 0 ||
		(h = head) == null || h.waitStatus < 0) {
		Node s = node.next;
		// 如果後續節點是申請共享鎖的,則喚醒後續節點
		if (s == null || s.isShared())
			doReleaseShared();
	}
}

/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
	/*
	 * Ensure that a release propagates, even if there are other
	 * in-progress acquires/releases.This proceeds in the usual
	 * way of trying to unparkSuccessor of head if it needs
	 * signal. But if it does not, status is set to PROPAGATE to
	 * ensure that upon release, propagation continues.
	 * Additionally, we must loop in case a new node is added
	 * while we are doing this. Also, unlike other uses of
	 * unparkSuccessor, we need to know if CAS to reset status
	 * fails, if so rechecking.
	 */
	for (;;) {
		//記錄當前頭節點,後面會喚醒後繼結點,如果後繼結點成功獲取共享鎖,會修改等待佇列的頭節點;
		Node h = head;
		//
		if (h != null && h != tail) {
			int ws = h.waitStatus;
			// 如果節點等待狀態為SIGNAL,後續肯定有阻塞節點,則喚起後續節點
			if (ws == Node.SIGNAL) {
				// 將當前節點的等待狀態置為0(初始化狀態)
				if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
					continue;// loop to recheck cases
				// 喚醒後續節點
				unparkSuccessor(h);
			}
			// 如果當前節點的等待狀態為0(節點初始化,無後續節點),將節點狀態修改為PRORAGATE,表示一旦出現一個新的共享結點連線在該結點後,該結點的共享鎖將傳播下去。
			else if (ws == 0 &&
					 !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
				continue;// loop on failed CAS
		}
		// 如果後繼結點未能成功獲取共享鎖(獲取成功後會修改頭節點),結束迴圈
		if (h == head)// loop if head changed
			break;
	}
}

釋放

共享鎖的釋放後,會喚醒後續節點。

public final void acquireShared(int arg) {
	// 如果成功獲取共享鎖,直接返回;否則,進入等待佇列
	if (tryAcquireShared(arg) < 0)
		// 進入等待佇列,自旋獲取鎖或阻塞
		doAcquireShared(arg);
}

/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
	// 加入到等待佇列
	final Node node = addWaiter(Node.SHARED);
	// 是否中斷標識,此處也是在獲取鎖結束後響應中斷,只有在獲取中斷鎖(acquireSharedInterruptibly)時,才會在獲取鎖的過程中響應中斷
	boolean interrupted = false;
	try {
		for (;;) {
			// 獲取前繼節點
			final Node p = node.predecessor();
			// 如果前繼節點為頭節點(成功獲取了鎖的節點),則此階段有資格獲取鎖
			if (p == head) {
				//嘗試獲取鎖
				int r = tryAcquireShared(arg);
				// 獲取鎖成功
				if (r >= 0) {
					// 成功獲取鎖後,將自身設定為等待佇列的頭節點(之前的頭節點會出隊,被GC回收),並喚醒所有後續的共享鎖申請節點
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					return;
				}
			}
			// 如果沒有獲取鎖資格,判斷是否可以進入阻塞狀態;如果可以,執行緒進入阻塞狀態
			if (shouldParkAfterFailedAcquire(p, node))
				interrupted |= parkAndCheckInterrupt();
		}
	} catch (Throwable t) {
		// 如果請求共享鎖發生異常,將當前節點從等待佇列中刪除,取消申請鎖流程
		cancelAcquire(node);
		throw t;
	} finally {
		// 響應中斷
		if (interrupted)
			selfInterrupt();
	}
}

/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
	Node h = head; // Record old head for check below
	// 修改等待佇列的頭節點,之前的頭節點出隊,等待GC回收
	setHead(node);
	/*
	 * Try to signal next queued node if:
	 *Propagation was indicated by caller,
	 *or was recorded (as h.waitStatus either before
	 *or after setHead) by a previous operation
	 *(note: this uses sign-check of waitStatus because
	 *PROPAGATE status may transition to SIGNAL.)
	 * and
	 *The next node is waiting in shared mode,
	 *or we don't know, because it appears null
	 *
	 * The conservatism in both of these checks may cause
	 * unnecessary wake-ups, but only when there are multiple
	 * racing acquires/releases, so most need signals now or soon
	 * anyway.
	 */
	// 如果資源有剩餘量,並且頭節點等待狀態小於0(小於0表示頭節點後續有阻塞節點),則繼續處理後續節點
	if (propagate > 0 || h == null || h.waitStatus < 0 ||
		(h = head) == null || h.waitStatus < 0) {
		Node s = node.next;
		// 如果後續節點為空,重新檢查等待佇列,或是申請共享鎖的,則喚醒後續節點
		if (s == null || s.isShared())
			doReleaseShared();
	}
}

/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
	/*
	 * Ensure that a release propagates, even if there are other
	 * in-progress acquires/releases.This proceeds in the usual
	 * way of trying to unparkSuccessor of head if it needs
	 * signal. But if it does not, status is set to PROPAGATE to
	 * ensure that upon release, propagation continues.
	 * Additionally, we must loop in case a new node is added
	 * while we are doing this. Also, unlike other uses of
	 * unparkSuccessor, we need to know if CAS to reset status
	 * fails, if so rechecking.
	 */
	for (;;) {
		//記錄當前頭節點,後面會喚醒後繼結點,如果後繼結點成功獲取共享鎖,會修改等待佇列的頭節點;
		Node h = head;
		//
		if (h != null && h != tail) {
			int ws = h.waitStatus;
			// 如果節點等待狀態為SIGNAL,後續肯定有阻塞節點,則喚起後續節點
			if (ws == Node.SIGNAL) {
				// 將當前節點的等待狀態置為0(初始化狀態)
				if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
					continue;// loop to recheck cases
				// 喚醒後續節點
				unparkSuccessor(h);
			}
			// 如果當前節點的等待狀態為0(節點初始化,無後續節點),將節點狀態修改為PRORAGATE,表示一旦出現一個新的共享結點連線在該結點後,該結點的共享鎖將傳播下去。
			else if (ws == 0 &&
					 !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
				continue;// loop on failed CAS
		}
		// 如果後繼結點成功獲取共享鎖(獲取成功後會修改頭節點),結束迴圈
		if (h == head)// loop if head changed
			break;
	}
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
	/*
	 * If status is negative (i.e., possibly needing signal) try
	 * to clear in anticipation of signalling.It is OK if this
	 * fails or if status is changed by waiting thread.
	 */
	int ws = node.waitStatus;
	if (ws < 0)
		node.compareAndSetWaitStatus(ws, 0);

	/*
	 * Thread to unpark is held in successor, which is normally
	 * just the next node.But if cancelled or apparently null,
	 * traverse backwards from tail to find the actual
	 * non-cancelled successor.
	 */
	Node s = node.next;
	if (s == null || s.waitStatus > 0) {
		s = null;
		for (Node p = tail; p != node && p != null; p = p.prev)
			if (p.waitStatus <= 0)
				s = p;
	}
	if (s != null)
		LockSupport.unpark(s.thread);
}

Condition/ConditionObject

在沒有Lock之前,我們使用synchronized來控制同步,配合Object的wait()、notify()系列方法可以實現等待/通知模式。在Java SE5後,Java提供了Lock介面,相對於Synchronized而言,Lock提供了條件Condition,對執行緒的等待、喚醒操作更加詳細和靈活。

Condition是一種廣義上的條件佇列。他為執行緒提供了一種更為靈活的等待/通知模式,Condition必須要配合鎖一起使用,因為對共享狀態變數的訪問發生在多執行緒環境下。一個Condition的例項必須與一個Lock繫結,因此Condition一般都是作為Lock的內部實現。

ConditionObject是AQS中的內部類,提供了條件鎖的同步實現,實現了Condition介面,並且實現了其中的await(),signal(),signalALL()等方法。在一個AQS同步器中,可以定義多個Condition,只需要多次lock.newCondition(),每次都會返回一個新的ConditionObject物件。在ConditionObject中,通過一個條件佇列來維護條線等待的執行緒。所以在一個同步器中可以有多個等待佇列,他們等待的條件是不一樣的。

條件佇列

條件佇列是一個FIFO的佇列,在佇列的每個節點都包含了一個執行緒引用。該執行緒就是在Condition物件上等待的執行緒。這裡的節點和AQS中的同步佇列中的節點一樣,使用的都是AbstractQueuedSynchronizer.Node類。每個呼叫了condition.await()的執行緒都會進入到 條件佇列中去。
在Condition中包含了firstWaiter和lastWaiter,每次加入到 條件佇列中的執行緒都會加入到 條件佇列的尾部,來構成一個FIFO的 條件佇列。

await()

JDK11中ConditionObject中的實現原始碼

public final void await() throws InterruptedException {
	// 如果當前執行緒中斷,丟擲異常
	if (Thread.interrupted())
		throw new InterruptedException();
	// 將當前執行緒加入到等待佇列的尾部
	Node node = addConditionWaiter();
	// 釋放執行緒佔用的鎖,如果釋放出現異常,標記當前節點為CANCELLED,後續會刪除此節點
	int savedState = fullyRelease(node);
	int interruptMode = 0;
	// 如果當前執行緒不在同步佇列中
	while (!isOnSyncQueue(node)) {
		// 阻塞當前執行緒,直到被喚醒或執行緒中斷(呼叫await方法後,上面完成了加入條件佇列,釋放鎖的過程,阻塞到此處,後續是被喚醒後的流程)
		LockSupport.park(this);
		// 如果執行緒中斷,結束迴圈
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
			break;
	}
	// 被喚醒後,會嘗試去獲取資源(資源資料量由之前儲存)
	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
		interruptMode = REINTERRUPT;
	// 如果當前節點後續還有等待執行緒,清理條件佇列(將當前節點清理出條件佇列)
	if (node.nextWaiter != null) // clean up if cancelled
		unlinkCancelledWaiters();
	// 對中斷進行處理
	if (interruptMode != 0)
		reportInterruptAfterWait(interruptMode);
}

文件