AQS CAS簡單詳解(寫的比較好)
CAS(Compare And Swap)
什麼是CAS
CAS(Compare And Swap),即比較並交換。是解決多執行緒並行情況下使用鎖造成效能損耗的一種機制,CAS操作包含三個運算元——記憶體位置(V)、預期原值(A)和新值(B)。如果記憶體位置的值與預期原值相匹配,那麼處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在CAS指令之前返回該位置的值。CAS有效地說明了“我認為位置V應該包含值A;如果包含該值,則將B放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。
在JAVA中,sun.misc.Unsafe
類提供了硬體級別的原子操作來實現這個CAS。 java.util.concurrent
Unsafe.java
類的CAS操作。至於 Unsafe.java
的具體實現這裡就不討論了。
CAS典型應用
java.util.concurrent.atomic
包下的類大多是使用CAS操作來實現的(eg. AtomicInteger.java
,AtomicBoolean
,AtomicLong
)。下面以 AtomicInteger.java
的部分實現來大致講解下這些原子類的實現。
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private volatile int value;// 初始int大小
// 省略了部分程式碼...
// 帶引數建構函式,可設定初始int大小
public AtomicInteger(int initialValue) {
value = initialValue;
}
// 不帶引數建構函式,初始int大小為0
public AtomicInteger() {
}
// 獲取當前值
public final int get() {
return value;
}
// 設定值為 newValue
public final void set(int newValue) {
value = newValue;
}
//返回舊值,並設定新值為 newValue
public final int getAndSet(int newValue) {
/**
* 這裡使用for迴圈不斷通過CAS操作來設定新值
* CAS實現和加鎖實現的關係有點類似樂觀鎖和悲觀鎖的關係
* */
for (;;) {
int current = get();
if (compareAndSet(current, newValue))
return current;
}
}
// 原子的設定新值為update, expect為期望的當前的值
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// 獲取當前值current,並設定新值為current+1
public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
// 此處省略部分程式碼,餘下的程式碼大致實現原理都是類似的
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
一般來說在競爭不是特別激烈的時候,使用該包下的原子操作效能比使用 synchronized
關鍵字的方式高效的多(檢視getAndSet()
,可知如果資源競爭十分激烈的話,這個for迴圈可能換持續很久都不能成功跳出。不過這種情況可能需要考慮降低資源競爭才是)。
在較多的場景我們都可能會使用到這些原子類操作。一個典型應用就是計數了,在多執行緒的情況下需要考慮執行緒安全問題。通常第一映像可能就是:
public class Counter {
private int count;
public Counter(){}
public int getCount(){
return count;
}
public void increase(){
count++;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
上面這個類在多執行緒環境下會有執行緒安全問題,要解決這個問題最簡單的方式可能就是通過加鎖的方式,調整如下:
public class Counter {
private int count;
public Counter(){}
public synchronized int getCount(){
return count;
}
public synchronized void increase(){
count++;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
這類似於悲觀鎖的實現,我需要獲取這個資源,那麼我就給他加鎖,別的執行緒都無法訪問該資源,直到我操作完後釋放對該資源的鎖。我們知道,悲觀鎖的效率是不如樂觀鎖的,上面說了Atomic下的原子類的實現是類似樂觀鎖的,效率會比使用 synchronized
關係字高,推薦使用這種方式,實現如下:
public class Counter {
private AtomicInteger count = new AtomicInteger();
public Counter(){}
public int getCount(){
return count.get();
}
public void increase(){
count.getAndIncrement();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
AQS(AbstractQueuedSynchronizer
)
什麼是AQS
AQS(AbstractQueuedSynchronizer
),AQS是JDK下提供的一套用於實現基於FIFO等待佇列的阻塞鎖和相關的同步器的一個同步框架。這個抽象類被設計為作為一些可用原子int值來表示狀態的同步器的基類。如果你有看過類似 CountDownLatch
類的原始碼實現,會發現其內部有一個繼承了 AbstractQueuedSynchronizer
的內部類 Sync
。可見 CountDownLatch
是基於AQS框架來實現的一個同步器.類似的同步器在JUC下還有不少。(eg. Semaphore
)
AQS用法
如上所述,AQS管理一個關於狀態資訊的單一整數,該整數可以表現任何狀態。比如, Semaphore
用它來表現剩餘的許可數,ReentrantLock
用它來表現擁有它的執行緒已經請求了多少次鎖;FutureTask
用它來表現任務的狀態(尚未開始、執行、完成和取消)
To use this class as the basis of a synchronizer, redefine the
* following methods, as applicable, by inspecting and/or modifying
* the synchronization state using {@link #getState}, {@link
* #setState} and/or {@link #compareAndSetState}:
*
* <ul>
* <li> {@link #tryAcquire}
* <li> {@link #tryRelease}
* <li> {@link #tryAcquireShared}
* <li> {@link #tryReleaseShared}
* <li> {@link #isHeldExclusively}
* </ul>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
如JDK的文件中所說,使用AQS來實現一個同步器需要覆蓋實現如下幾個方法,並且使用getState
,setState
,compareAndSetState
這幾個方法來設定獲取狀態
1. boolean tryAcquire(int arg)
2. boolean tryRelease(int arg)
3. int tryAcquireShared(int arg)
4. boolean tryReleaseShared(int arg)
5. boolean isHeldExclusively()
以上方法不需要全部實現,根據獲取的鎖的種類可以選擇實現不同的方法,支援獨佔(排他)獲取鎖的同步器應該實現tryAcquire
、 tryRelease
、isHeldExclusively
而支援共享獲取的同步器應該實現tryAcquireShared
、tryReleaseShared
、isHeldExclusively
。下面以 CountDownLatch
舉例說明基於AQS實現同步器, CountDownLatch
用同步狀態持有當前計數,countDown
方法呼叫
release從而導致計數器遞減;當計數器為0時,解除所有執行緒的等待;await
呼叫acquire,如果計數器為0,acquire
會立即返回,否則阻塞。通常用於某任務需要等待其他任務都完成後才能繼續執行的情景。原始碼如下:
public class CountDownLatch {
/**
* 基於AQS的內部Sync
* 使用AQS的state來表示計數count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
// 使用AQS的getState()方法設定狀態
setState(count);
}
int getCount() {
// 使用AQS的getState()方法獲取狀態
return getState();
}
// 覆蓋在共享模式下嘗試獲取鎖
protected int tryAcquireShared(int acquires) {
// 這裡用狀態state是否為0來表示是否成功,為0的時候可以獲取到返回1,否則不可以返回-1
return (getState() == 0) ? 1 : -1;
}
// 覆蓋在共享模式下嘗試釋放鎖
protected boolean tryReleaseShared(int releases) {
// 在for迴圈中Decrement count直至成功;
// 當狀態值即count為0的時候,返回false表示 signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
// 使用給定計數值構造CountDownLatch
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 讓當前執行緒阻塞直到計數count變為0,或者執行緒被中斷
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 阻塞當前執行緒,除非count變為0或者等待了timeout的時間。當count變為0時,返回true
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// count遞減
public void countDown() {
sync.releaseShared(1);
}
// 獲取當前count值
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}