Java中的隊列同步器AQS
一、AQS概念
1、隊列同步器是用來構建鎖或者其他同步組件的基礎框架,使用一個int型變量代表同步狀態,通過內置的隊列來完成線程的排隊工作。
2、下面是JDK8文檔中對於AQS的部分介紹
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable 提供一個框架,用於實現依賴先進先出(FIFO)等待隊列的阻塞鎖和相關同步器(信號量,事件等)。 該類被設計為大多數類型的同步器的有用依據,這些同步器依賴於單個原子int值來表示狀態。子類必須定義改變此狀態的protected方法,以及根據該對象被獲取或釋放來定義該狀態的含義。給定這些,這個類中的其他方法執行所有排隊和阻塞機制。 子類可以保持其他狀態字段,但只以
原子方式更新int使用方法操縱值getState() , setState(int)和compareAndSetState(int, int)被跟蹤相對於同步。 此類支持默認獨占模式和共享模式。 當以獨占模式獲取時,嘗試通過其他線程獲取不能成功。 多線程獲取的共享模式可能(但不需要)成功。 除了在機械意義上,這個類不理解這些差異,當共享
模式獲取成功時,下一個等待線程(如果存在)也必須確定它是否也可以獲取。 在不同模式下等待的線程共享相同的FIFO隊列。 通常,實現子類只支持這些模式之一,但是兩者都可以在ReadWriteLock中發揮作用。僅支持獨占或僅共享模式的子類不需要定義支持未使用模式的方法。
總結來說就是:
①子類通過繼承AQS並實現其抽象方法來管理同步狀態,對於同步狀態的更改通過提供的getState()、setState(int state)、compareAndSetState(int expect, int update)來進行操作,因為使用CAS操作保證同步狀態的改變是原子的。
②子類被推薦定義為自定義同步組件的靜態內部類,同步器本身並沒有實現任何的同步接口,僅僅是定義了若幹狀態獲取和釋放的方法來提供自定義同步組件的使用。
③同步器既可以支持獨占式的獲取同步狀態,也可以支持共享式的獲取同步狀態(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等不同類型的同步組件)
3、同步器是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義;
二、AQS的接口和實例
1、同步器的設計實現原理
繼承同步器並且重寫指定的方法,然後將同步器組合在自定義同步組件的實現中,並且調用同步器提供的模板方法(這些模板方法會調用重寫的方法);而重寫指定的方法的時候,需要使用getState()、setState(int state)、compareAndSetState(int expect, int update)來訪問或者更新同步狀態。下面是源碼中state變量和三個方法的定義聲明實現
1 /** 2 * The synchronization state.(同步狀態) 3 */ 4 private volatile int state; 5 6 /** 7 * Returns the current value of synchronization state.(返回當前的同步狀態) 8 * This operation has memory semantics of a {@code volatile} read. 9 * 此操作的內存語義為@code volatile read 10 * @return current state value 11 */ 12 protected final int getState() { 13 return state; 14 } 15 16 /** 17 * Sets the value of synchronization state.(設置新的同步狀態) 18 * This operation has memory semantics of a {@code volatile} write. 19 * 此操作的內存語義為@code volatile read 20 * @param newState the new state value 21 */ 22 protected final void setState(int newState) { 23 state = newState; 24 } 25 26 /** 27 * Atomically sets synchronization state to the given updated 28 * value if the current state value equals the expected value.(如果要更新的狀態和期望的狀態相同,那就通過原子的方式更新狀態) 29 * This operation has memory semantics of a {@code volatile} read 30 * and write.( 此操作的內存語義為@code volatile read 和 write) 31 * 32 * @param expect the expected value 33 * @param update the new value 34 * @return {@code true} if successful. False return indicates that the actual 35 * value was not equal to the expected value.(如果更新的狀態和期望的狀態不同就返回false) 36 */ 37 protected final boolean compareAndSetState(int expect, int update) { 38 // See below for intrinsics setup to support this 39 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 40 }
2、下面介紹AQS提供可被重寫的方法
①tryAcquire()方法
/** * 獨占式的獲取同步狀態,實現該方法需要查詢當前狀態並判斷同步狀態是否符合預期,然後再進行CAS設置同步狀態 * */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
②tryRelease()方法
/** * 獨占式的釋放同步狀態,等待獲取同步狀態的線程可以有機會獲取同步狀態 * */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
③tryAcquireShared()方法
/** * 嘗試以共享模式獲取。 該方法應該查詢對象的狀態是否允許在共享模式下獲取該對象,如果是這樣,就可以獲取它。 該方法總是由執行獲取的線程調用。
* 如果此方法報告失敗,則獲取方法可能將線程排隊(如果尚未排隊),直到被其他線程釋放為止。 獲取失敗時返回負值,如果在獲取成共享模式下功但沒
* 有後續共享模式獲取可以成功,則為零; 並且如果以共享模式獲取成功並且隨後的共享模式獲取可能成功,則為正值,在這種情況下,後續等待線程必須檢查可用性。 */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); //如果不支持共享模式 ,會拋出該異常 }
④tryReleaseShared()方法
/** * 嘗試將狀態設置為以共享模式釋放同步狀態。 該方法總是由執行釋放的線程調用。 */ protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); //如果不支持共享模式 ,會拋出該異常 }
⑤isHeldExclusively() 方法
/** * 當前同步器是否在獨占模式下被線程占用,一般該方法表示是否被當前線程所獨占 */ protected int isHeldExclusively(int arg) { throw new UnsupportedOperationException(); //如果不支持共享模式 ,會拋出該異常 }
3、同步器提供的模板方法
在實現自定義同步組件的時候,需要重寫上面的方法,而下面的模板方法會調用上面重寫的方法。下面介紹同步器提供的模板方法
①acquire()方法
/** * 以獨占模式獲取,忽略中斷。 通過調用至少一次tryAcquire(int)實現,成功返回。 否則線 * 程排隊,可能會重復阻塞和解除阻塞,直到成功才調用tryAcquire(int) */ public final void acquire(int arg) {...}
②acquireInterruptibly()方法
/** * 以獨占方式獲得,如果中斷,中止。 通過首先檢查中斷狀態,然後調用至少一次 * tryAcquire(int) ,成功返回。 否則線程排隊,可能會重復阻塞和解除阻塞,調用 * tryAcquire(int)直到成功或線程中斷。 */ public final void acquireInterruptibly(int arg) throws InterruptedException {...}
③tryAcquireNanos()方法
/** * 嘗試以獨占模式獲取,如果中斷則中止,如果給定的超時時間失敗。 首先檢查中斷狀態,然 * 後調用至少一次tryAcquire(int) ,成功返回。 否則,線程排隊,可能會重復阻塞和解除阻 * 塞,調用tryAcquire(int)直到成功或線程中斷或超時 */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {...}
④acquireShared()方法
/** * 以共享模式獲取,忽略中斷。 通過首次調用至少一次執行 tryAcquireShared(int),成功返 * 回。 否則線程排隊,可能會重復阻塞和解除阻塞,直到成功調用tryAcquireShared(int) 。 */ public final void acquireShared(int arg){...}
⑤acquireSharedInterruptibly()方法
/** * 以共享方式獲取,如果中斷,中止。 首先檢查中斷狀態,然後調用至少一次 * tryAcquireShared(int) ,成功返回。 否則線程排隊,可能會重復阻塞和解除阻塞,調用 * tryAcquireShared(int)直到成功或線程中斷。 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException{...}
⑥tryAcquireSharedNanos()方法
/** * 嘗試以共享模式獲取,如果中斷則中止,如果給定的時間超過,則失敗。 通過首先檢查中斷 * 狀態,然後調用至少一次tryAcquireShared(int) ,成功返回。 否則,線程排隊,可能會重 * 復阻塞和解除阻塞,調用tryAcquireShared(int)直到成功或線程中斷或超時。 */ public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{...}
⑦release()方法
/** * 獨占式的釋放同步狀態,該方法會在釋放同步狀態之後,將同步隊列中的第一個節點包含的線程喚醒 */ public final boolean release(int arg){...}
⑧releaseShared()方法
/** * 共享式的釋放同步狀態 */ public final boolean releaseShared(int arg){...}
⑨getQueuedThreads()方法
/** * 獲取在等待隊列上的線程集合 */ public final Collection<Thread> getQueuedThreads(){...}
Java中的隊列同步器AQS