1. 程式人生 > >AQS CAS簡單詳解(寫的比較好)

AQS CAS簡單詳解(寫的比較好)

CAS(Compare And Swap)

什麼是CAS

CAS(Compare And Swap),即比較並交換。是解決多執行緒並行情況下使用鎖造成效能損耗的一種機制,CAS操作包含三個運算元——記憶體位置(V)、預期原值(A)和新值(B)。如果記憶體位置的值與預期原值相匹配,那麼處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在CAS指令之前返回該位置的值。CAS有效地說明了“我認為位置V應該包含值A;如果包含該值,則將B放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。

在JAVA中,sun.misc.Unsafe 類提供了硬體級別的原子操作來實現這個CAS。 java.util.concurrent

 包下的大量類都使用了這個 Unsafe.java 類的CAS操作。至於 Unsafe.java 的具體實現這裡就不討論了。

CAS典型應用

java.util.concurrent.atomic 包下的類大多是使用CAS操作來實現的(eg. AtomicInteger.java,AtomicBoolean,AtomicLong)。下面以 AtomicInteger.java 的部分實現來大致講解下這些原子類的實現。

public class AtomicInteger extends Number implements java.io.Serializable {
    private
static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private volatile int value;// 初始int大小 // 省略了部分程式碼... // 帶引數建構函式,可設定初始int大小 public AtomicInteger(int initialValue) { value = initialValue; } // 不帶引數建構函式,初始int大小為0
public AtomicInteger() { } // 獲取當前值 public final int get() { return value; } // 設定值為 newValue public final void set(int newValue) { value = newValue; } //返回舊值,並設定新值為 newValue public final int getAndSet(int newValue) { /** * 這裡使用for迴圈不斷通過CAS操作來設定新值 * CAS實現和加鎖實現的關係有點類似樂觀鎖和悲觀鎖的關係 * */ for (;;) { int current = get(); if (compareAndSet(current, newValue)) return current; } } // 原子的設定新值為update, expect為期望的當前的值 public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } // 獲取當前值current,並設定新值為current+1 public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } } // 此處省略部分程式碼,餘下的程式碼大致實現原理都是類似的 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

一般來說在競爭不是特別激烈的時候,使用該包下的原子操作效能比使用 synchronized 關鍵字的方式高效的多(檢視getAndSet(),可知如果資源競爭十分激烈的話,這個for迴圈可能換持續很久都不能成功跳出。不過這種情況可能需要考慮降低資源競爭才是)。 
在較多的場景我們都可能會使用到這些原子類操作。一個典型應用就是計數了,在多執行緒的情況下需要考慮執行緒安全問題。通常第一映像可能就是:

public class Counter {
    private int count;
    public Counter(){}
    public int getCount(){
        return count;
    }
    public void increase(){
        count++;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

上面這個類在多執行緒環境下會有執行緒安全問題,要解決這個問題最簡單的方式可能就是通過加鎖的方式,調整如下:

public class Counter {
    private int count;
    public Counter(){}
    public synchronized int getCount(){
        return count;
    }
    public synchronized void increase(){
        count++;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

這類似於悲觀鎖的實現,我需要獲取這個資源,那麼我就給他加鎖,別的執行緒都無法訪問該資源,直到我操作完後釋放對該資源的鎖。我們知道,悲觀鎖的效率是不如樂觀鎖的,上面說了Atomic下的原子類的實現是類似樂觀鎖的,效率會比使用 synchronized 關係字高,推薦使用這種方式,實現如下:

public class Counter {
    private AtomicInteger count = new AtomicInteger();
    public Counter(){}
    public int getCount(){
        return count.get();
    }
    public void increase(){
        count.getAndIncrement();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

AQS(AbstractQueuedSynchronizer)

什麼是AQS

AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用於實現基於FIFO等待佇列的阻塞鎖和相關的同步器的一個同步框架。這個抽象類被設計為作為一些可用原子int值來表示狀態的同步器的基類。如果你有看過類似 CountDownLatch 類的原始碼實現,會發現其內部有一個繼承了 AbstractQueuedSynchronizer 的內部類 Sync。可見 CountDownLatch 是基於AQS框架來實現的一個同步器.類似的同步器在JUC下還有不少。(eg. Semaphore)

AQS用法

如上所述,AQS管理一個關於狀態資訊的單一整數,該整數可以表現任何狀態。比如, Semaphore 用它來表現剩餘的許可數,ReentrantLock 用它來表現擁有它的執行緒已經請求了多少次鎖;FutureTask 用它來表現任務的狀態(尚未開始、執行、完成和取消)

 To use this class as the basis of a synchronizer, redefine the
 * following methods, as applicable, by inspecting and/or modifying
 * the synchronization state using {@link #getState}, {@link
 * #setState} and/or {@link #compareAndSetState}:
 *
 * <ul>
 * <li> {@link #tryAcquire}
 * <li> {@link #tryRelease}
 * <li> {@link #tryAcquireShared}
 * <li> {@link #tryReleaseShared}
 * <li> {@link #isHeldExclusively}
 * </ul>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

如JDK的文件中所說,使用AQS來實現一個同步器需要覆蓋實現如下幾個方法,並且使用getState,setState,compareAndSetState這幾個方法來設定獲取狀態 
1. boolean tryAcquire(int arg) 
2. boolean tryRelease(int arg) 
3. int tryAcquireShared(int arg) 
4. boolean tryReleaseShared(int arg) 
5. boolean isHeldExclusively()

以上方法不需要全部實現,根據獲取的鎖的種類可以選擇實現不同的方法,支援獨佔(排他)獲取鎖的同步器應該實現tryAcquire、 tryReleaseisHeldExclusively而支援共享獲取的同步器應該實現tryAcquireSharedtryReleaseSharedisHeldExclusively。下面以 CountDownLatch 舉例說明基於AQS實現同步器, CountDownLatch 用同步狀態持有當前計數,countDown方法呼叫 release從而導致計數器遞減;當計數器為0時,解除所有執行緒的等待;await呼叫acquire,如果計數器為0,acquire 會立即返回,否則阻塞。通常用於某任務需要等待其他任務都完成後才能繼續執行的情景。原始碼如下:

public class CountDownLatch {
    /**
     * 基於AQS的內部Sync
     * 使用AQS的state來表示計數count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            // 使用AQS的getState()方法設定狀態
            setState(count);
        }

        int getCount() {
            // 使用AQS的getState()方法獲取狀態
            return getState();
        }

        // 覆蓋在共享模式下嘗試獲取鎖
        protected int tryAcquireShared(int acquires) {
            // 這裡用狀態state是否為0來表示是否成功,為0的時候可以獲取到返回1,否則不可以返回-1
            return (getState() == 0) ? 1 : -1;
        }

        // 覆蓋在共享模式下嘗試釋放鎖
        protected boolean tryReleaseShared(int releases) {
            // 在for迴圈中Decrement count直至成功;
            // 當狀態值即count為0的時候,返回false表示 signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    // 使用給定計數值構造CountDownLatch
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    // 讓當前執行緒阻塞直到計數count變為0,或者執行緒被中斷
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 阻塞當前執行緒,除非count變為0或者等待了timeout的時間。當count變為0時,返回true
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // count遞減
    public void countDown() {
        sync.releaseShared(1);
    }

    // 獲取當前count值
    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}