AtomicInteger原始碼及CAS深度分析
1、原子類
- 可以實現一些原子操作
- 基於CAS
下面就以AtomicInteger為例。
2、AtomicInteger
在沒有AtomicInteger之前,對於一個Integer的執行緒安全操作,是需要使用同步鎖來實現的,當然現在也可以通過ReentrantLock來實現,但是最好最方便的實現方式是採用AtomicInteger。
具體示例:
package com.collection.test;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 原子類的測試
*/
public class AtomicTest {
private static AtomicInteger atomicInteger = new AtomicInteger();
//獲取當前值
public static void getCurrentValue(){
System.out.println(atomicInteger.get());//-->0
}
//設定value值
public static void setValue(){
atomicInteger.set(12);//直接用12覆蓋舊值
System.out.println(atomicInteger.get ());//-->12
}
//根據方法名稱getAndSet就知道先get,則最後返回的就是舊值,如果get在後,就是返回新值
public static void getAndSet(){
System.out.println(atomicInteger.getAndSet(15));//-->12
}
public static void getAndIncrement(){
System.out.println(atomicInteger.getAndIncrement());//-->15
}
public static void getAndDecrement(){
System.out.println(atomicInteger.getAndDecrement());//-->16
}
public static void getAndAdd(){
System.out.println(atomicInteger.getAndAdd(10));//-->15
}
public static void incrementAndGet(){
System.out.println(atomicInteger.incrementAndGet());//-->26
}
public static void decrementAndGet(){
System.out.println(atomicInteger.decrementAndGet());//-->25
}
public static void addAndGet(){
System.out.println(atomicInteger.addAndGet(20));//-->45
}
public static void main(String[] args) {
AtomicTest test = new AtomicTest();
test.getCurrentValue();
test.setValue();
//返回舊值系列
test.getAndSet();
test.getAndIncrement();
test.getAndDecrement();
test.getAndAdd();
//返回新值系列
test.incrementAndGet();
test.decrementAndGet();
test.addAndGet();
}
}
private volatile int value;// 初始化值
/**
* 建立一個AtomicInteger,初始值value為initialValue
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
/**
* 建立一個AtomicInteger,初始值value為0
*/
public AtomicInteger() {
}
/**
* 返回value
*/
public final int get() {
return value;
}
/**
* 為value設值(基於value),而其他操作是基於舊值<--get()
*/
public final void set(int newValue) {
value = newValue;
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* 基於CAS為舊值設定新值,採用無限迴圈,直到設定成功為止
*
* @return 返回舊值
*/
public final int getAndSet(int newValue) {
for (;;) {
int current = get();// 獲取當前值(舊值)
if (compareAndSet(current, newValue))// CAS新值替代舊值
return current;// 返回舊值
}
}
/**
* 當前值+1,採用無限迴圈,直到+1成功為止
* @return the previous value 返回舊值
*/
public final int getAndIncrement() {
for (;;) {
int current = get();//獲取當前值
int next = current + 1;//當前值+1
if (compareAndSet(current, next))//基於CAS賦值
return current;
}
}
/**
* 當前值-1,採用無限迴圈,直到-1成功為止
* @return the previous value 返回舊值
*/
public final int getAndDecrement() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return current;
}
}
/**
* 當前值+delta,採用無限迴圈,直到+delta成功為止
* @return the previous value 返回舊值
*/
public final int getAndAdd(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return current;
}
}
/**
* 當前值+1, 採用無限迴圈,直到+1成功為止
* @return the updated value 返回新值
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;//返回新值
}
}
/**
* 當前值-1, 採用無限迴圈,直到-1成功為止
* @return the updated value 返回新值
*/
public final int decrementAndGet() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return next;//返回新值
}
}
/**
* 當前值+delta,採用無限迴圈,直到+delta成功為止
* @return the updated value 返回新值
*/
public final int addAndGet(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return next;//返回新值
}
}
/**
* 獲取當前值
*/
public int intValue() {
return get();
}
說明:使用與原始碼都簡單到爆了!自己看看註釋。
注意:
value是volatile的,關於volatile的相關內容見《附2 volatile》,具體連結:http://www.cnblogs.com/java-zhao/p/5125698.html
單步操作:例如set()是直接對value進行操作的,不需要CAS,因為單步操作就是原子操作。
多步操作:例如getAndSet(int newValue)是兩步操作–>先獲取值,在設定值,所以需要原子化,這裡採用CAS實現。
對於方法是返回舊值還是新值,直接看方法是以get開頭(返回舊值)還是get結尾(返回新值)就好
CAS:比較CPU記憶體上的值是不是當前值current,如果是就換成新值update,如果不是,說明獲取值之後到設定值之前,該值已經被別人先一步設定過了,此時如果自己再設定值的話,需要在別人修改後的值的基礎上去操作,否則就會覆蓋別人的修改,所以這個時候會直接返回false,再進行無限迴圈,重新獲取當前值,然後再基於CAS進行加減操作。
如果還是不懂CAS,類比資料庫的樂觀鎖。
補充一個東西:
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
這是AtomicInteger的所有屬性,其中value存的是當前值,而當前值存放的記憶體地址可以通過valueOffset來確定。實際上是“value欄位相對Java物件的起始地址的偏移量”
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
CAS方法:通過對比“valueOffset上的value”與expect是否相同,來決定是否修改value值為update值。
CAS
CAS:Compare and Swap, 翻譯成比較並交換。
java.util.concurrent包中藉助CAS實現了區別於synchronouse同步鎖的一種樂觀鎖。
CAS應用
CAS有3個運算元,記憶體值V,舊的預期值A,要修改的新值B。當且僅當預期值A和記憶體值V相同時,將記憶體值V修改為B,否則什麼都不做。
非阻塞演算法 (nonblocking algorithms)
一個執行緒的失敗或者掛起不應該影響其他執行緒的失敗或掛起的演算法。
現代的CPU提供了特殊的指令,可以自動更新共享資料,而且能夠檢測到其他執行緒的干擾,而 compareAndSet() 就用這些代替了鎖定。
拿出AtomicInteger來研究在沒有鎖的情況下是如何做到資料正確性的。
private volatile int value;
首先毫無以為,在沒有鎖的機制下可能需要藉助volatile原語,保證執行緒間的資料是可見的(共享的)。
這樣才獲取變數的值的時候才能直接讀取。
public final int get() {
return value;
}
然後來看看++i是怎麼做到的。
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
} }
在這裡採用了CAS操作,每次從記憶體中讀取資料然後將此資料和+1後的結果進行CAS操作,如果成功就返回結果,否則重試直到成功為止。
而compareAndSet利用JNI來完成CPU指令的操作。
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
整體的過程就是這樣子的,利用CPU的CAS指令,同時藉助JNI來完成Java的非阻塞演算法。其它原子操作都是利用類似的特性完成的。
其中
unsafe.compareAndSwapInt(this, valueOffset, expect, update);
類似:
if (this == expect) {
this = update
return true;
} else {
return false;
}
那麼問題就來了,成功過程中需要2個步驟:比較this == expect,替換this = update,compareAndSwapInt如何這兩個步驟的原子性呢? 參考CAS的原理。
CAS缺點
CAS雖然很高效的解決原子操作,但是CAS仍然存在三大問題。ABA問題,迴圈時間長開銷大和只能保證一個共享變數的原子操作
- ABA問題。因為CAS需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變數前面追加上版本號,每次變數更新的時候把版本號加一,那麼A-B-A 就會變成1A-2B-3A。
從Java1.5開始JDK的atomic包裡提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法作用是首先檢查當前引用是否等於預期引用,並且當前標誌是否等於預期標誌,如果全部相等,則以原子方式將該引用和該標誌的值設定為給定的更新值。
關於ABA問題參考文件: http://blog.hesey.net/2011/09/resolve-aba-by-atomicstampedreference.html - 迴圈時間長開銷大。自旋CAS如果長時間不成功,會給CPU帶來非常大的執行開銷。如果JVM能支援處理器提供的pause指令那麼效率會有一定的提升,pause指令有兩個作用,第一它可以延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它可以避免在退出迴圈的時候因記憶體順序衝突(memory order violation)而引起CPU流水線被清空(CPU pipeline flush),從而提高CPU的執行效率。
- 只能保證一個共享變數的原子操作。當對一個共享變數執行操作時,我們可以使用迴圈CAS的方式來保證原子操作,但是對多個共享變數操作時,迴圈CAS就無法保證操作的原子性,這個時候就可以用鎖,或者有一個取巧的辦法,就是把多個共享變數合併成一個共享變數來操作。比如有兩個共享變數i=2,j=a,合併一下ij=2a,然後用CAS來操作ij。從Java1.5開始JDK提供了AtomicReference類來保證引用物件之間的原子性,你可以把多個變數放在一個物件裡來進行CAS操作。
concurrent包的實現
由於java的CAS同時具有 volatile 讀和volatile寫的記憶體語義,因此Java執行緒之間的通訊現在有了下面四種方式:
A執行緒寫volatile變數,隨後B執行緒讀這個volatile變數。
A執行緒寫volatile變數,隨後B執行緒用CAS更新這個volatile變數。
A執行緒用CAS更新一個volatile變數,隨後B執行緒用CAS更新這個volatile變數。
A執行緒用CAS更新一個volatile變數,隨後B執行緒讀這個volatile變數。
Java的CAS會使用現代處理器上提供的高效機器級別原子指令,這些原子指令以原子方式對記憶體執行讀-改-寫操作,這是在多處理器中實現同步的關鍵(從本質上來說,能夠支援原子性讀-改-寫指令的計算機器,是順序計算圖靈機的非同步等價機器,因此任何現代的多處理器都會去支援某種能對記憶體執行原子性讀-改-寫操作的原子指令)。同時,volatile變數的讀/寫和CAS可以實現執行緒之間的通訊。把這些特性整合在一起,就形成了整個concurrent包得以實現的基石。如果我們仔細分析concurrent包的原始碼實現,會發現一個通用化的實現模式:
首先,宣告共享變數為volatile;
然後,使用CAS的原子條件更新來實現執行緒之間的同步;
同時,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的記憶體語義來實現執行緒之間的通訊。
AQS,非阻塞資料結構和原子變數類(java.util.concurrent.atomic包中的類),這些concurrent包中的基礎類都是使用這種模式來實現的,而concurrent包中的高層類又是依賴於這些基礎類來實現的。從整體來看,concurrent包的實現示意圖如下: