1. 程式人生 > >【搞定Java併發程式設計】第9篇:CAS詳解

【搞定Java併發程式設計】第9篇:CAS詳解

上一篇:volatile關鍵字詳解:https://blog.csdn.net/pcwl1206/article/details/84881395

目  錄

一、CAS基本概念

1.1、CAS的定義

1.2、CAS的3個運算元 

二、Java如何實現原子操作

2.1、相關概念

2.2、處理器如何實現原子操作

2.3、Java如何實現原子操作

三、原子變數

四、AtomicInteger原始碼解析

五、模擬CAS演算法


我們在上篇文章 volatile 關鍵字的講解中提到:volatile 不能保證變數狀態的“原子性操作”。

所謂的原子性就是:一個操作是不可中斷的,要麼全部執行成功要麼全部執行失敗,有著“同生共死”的感覺

在Java基礎系列文章中有一篇關於: i++ 和 ++i 詳解的文章,因為 i++操作不具有原子性。

今天我們就來講解Java提供的一種原子性的操作演算法:CAS演算法。先把CAS的關鍵概念列舉出來,後文再具體講解Java是如何實現其原子操作的。

一、CAS基本概念

1.1、CAS的定義

CAS(Compare  And  Swap):是一種硬體對併發的支援,針對多處理器操作而設計的處理器中的一種特殊指令,用於管理對共享資料的併發訪問。

CAS是一種無鎖的非阻塞演算法的實現。

1.2、CAS的3個運算元 

CAS包含了3個運算元:

1、記憶體值:需要讀寫的記憶體值V;

2、預估值:進行比較的值A,即記憶體中的舊值;

3、更新值:擬寫入的新值B。

  • 當且僅當 V = A 時,CAS通過原子方式用新值 B 來更新 V 的值,否則不會執行任何操作。

二、Java如何實現原子操作

本小節內容參考:https://blog.csdn.net/a724888/article/details/60871077

2.1、相關概念

  • 原子和原子操作

原子(atom)本意是“不能被進一步分割的最小粒子”,而原子操作(atomic operation)意為”不可被中斷的一個或一系列操作” 。

在多處理器上實現原子操作就變得有點複雜。下文中會講解在Inter處理器和Java裡是如何實現原子操作的。

  • 相關術語
術語名稱 英文名稱 解釋
快取行 Cache line 快取的最小操作單位
比較並交換 Compare and Swap CAS操作需要輸入兩個數值,一箇舊值(期望操作前的值)和一個新值,在操作期間先比較下在舊值有沒有發生變化,如果沒有發生變化,才交換成新值,發生了變化則不交換
CPU流水線 CPU pipeline CPU流水線的工作方式就象工業生產上的裝配流水線,在CPU中由5~6個不同功能的電路單元組成一條指令處理流水線,然後將一條X86指令分成5~6步後再由這些電路單元分別執行,這樣就能實現在一個CPU時鐘週期完成一條指令,因此提高CPU的運算速度
記憶體順序衝突 Memory order violation 記憶體順序衝突一般是由假共享引起,假共享是指多個CPU同時修改同一個快取行的不同部分而引起其中一個CPU的操作無效,當出現這個記憶體順序衝突時,CPU必須清空流水線

2.2、處理器如何實現原子操作

32位 IA-32 處理器使用基於對快取加鎖或匯流排加鎖的方式來實現多處理器之間的原子操作。

2.2.1   處理器自動保證基本記憶體操作的原子性

首先處理器會自動保證基本的記憶體操作的原子性。處理器保證從系統記憶體當中讀取或者寫入一個位元組是原子的,意思是當一個處理器讀取一個位元組時,其他處理器不能訪問這個位元組的記憶體地址。奔騰6和最新的處理器能自動保證單處理器對同一個快取行裡進行16/32/64位的操作是原子的,但是複雜的記憶體操作處理器不能自動保證其原子性,比如跨匯流排寬度,跨多個快取行,跨頁表的訪問。但是處理器提供匯流排鎖定快取鎖定兩個機制來保證複雜記憶體操作的原子性

2.2.2   使用匯流排鎖保證原子性

第一個機制是通過匯流排鎖保證原子性。如果多個處理器同時對共享變數進行讀改寫(i++就是經典的讀改寫操作)操作,那麼共享變數就會被多個處理器同時進行操作,這樣讀改寫操作就不是原子的,操作完之後共享變數的值會和期望的不一致,舉個例子:如果 i = 1,我們進行兩次 i++ 操作,我們期望的結果是 3,但是有可能結果是 2。如下圖

原因是:有可能多個處理器同時從各自的快取中讀取變數 i,分別進行加1操作,然後分別寫入系統記憶體當中。那麼想要保證讀改寫共享變數的操作是原子的,就必須保證CPU1讀改寫共享變數的時候,CPU2不能操作快取了該共享變數記憶體地址的快取。

處理器使用匯流排鎖就是來解決這個問題的。所謂匯流排鎖就是使用處理器提供的一個LOCK#訊號,當一個處理器在總線上輸出此訊號時,其他處理器的請求將被阻塞住,那麼該處理器可以獨佔使用共享記憶體。

2.2.3 使用快取鎖保證原子性

第二個機制是通過快取鎖定保證原子性。在同一時刻我們只需保證對某個記憶體地址的操作是原子性即可,但匯流排鎖定把CPU和記憶體之間通訊鎖住了,這使得鎖定期間,其他處理器不能操作其記憶體地址的資料,所以匯流排鎖定的開銷比較大,最近的處理器在某些場合下使用快取鎖定代替匯流排鎖定來進行優化

頻繁使用的記憶體會快取在處理器的L1,L2和L3快取記憶體裡,那麼原子操作就可以直接在處理器內部快取中進行,並不需要宣告匯流排鎖。在奔騰6和最近的處理器中可以使用“快取鎖定”的方式來實現複雜的原子性。所謂“快取鎖定”就是如果快取在處理器快取行中記憶體區域在LOCK操作期間被鎖定,當它執行鎖操作回寫記憶體時,處理器不在總線上聲言LOCK#訊號,而是修改內部的記憶體地址,並允許它的快取一致性機制來保證操作的原子性。因為快取一致性機制會阻止同時修改被兩個以上處理器快取的記憶體區域資料,當其他處理器回寫已被鎖定的快取行的資料時會起快取行無效。在上例中,當CPU1修改快取行中的 i 使用快取鎖定,那麼CPU2就不能同時快取了 i 的快取行。

但是有兩種情況下處理器不會使用快取鎖定。第一種情況是:當操作的資料不能被快取在處理器內部,或操作的資料跨多個快取行(cache line),則處理器會呼叫匯流排鎖定。第二種情況是:有些處理器不支援快取鎖定。對於Inter486和奔騰處理器,就算鎖定的記憶體區域在處理器的快取行中也會呼叫匯流排鎖定。

以上兩個機制我們可以通過Inter處理器提供了很多LOCK字首的指令來實現。比如位測試和修改指令BTS,BTR,BTC,交換指令XADD,CMPXCHG和其他一些運算元和邏輯指令,比如ADD(加),OR(或)等,被這些指令操作的記憶體區域就會加鎖,導致其他處理器不能同時訪問它。

2.3、Java如何實現原子操作

在Java中可以通過迴圈CAS的方式來實現原子操作。

2.3.1、使用CAS實現原子操作

JVM中的CAS操作正是利用了上文中提到的處理器提供的CMPXCHG指令實現的。自旋CAS實現的基本思路就是迴圈進行CAS操作直到成功為止,以下程式碼實現了一個基於CAS執行緒安全的計數器方法safeCount和一個非執行緒安全的計數器count。

public class Counter {

	private AtomicInteger atomicI = new AtomicInteger(0);
	
	private int i = 0;
	
	public static void main(String[] args){
		
		final Counter cas = new Counter();
		
		List<Thread> ts = new ArrayList<Thread>(600);
		
		long start = System.currentTimeMillis();
		
		for (int j = 0; j < 100; j++) {
			
			Thread t = new Thread(new Runnable(){

				@Override
				public void run() {
					for (int i = 0; i < 10000; i++) {
						cas.count();
						cas.safeCount();
					}
				}
				
			});
			ts.add(t);
		}
		
		for(Thread t : ts){
			t.start();
		}
		
		// 等待所有執行緒執行完成
		for(Thread t : ts){
			try{				
				t.join();
			}catch(InterruptedException e){
				e.printStackTrace();
			}
		}
		System.out.println("非執行緒安全計數器:" + cas.i);
		System.out.println("執行緒安全計數器:" + cas.atomicI.get());
		System.out.println(System.currentTimeMillis() - start + "毫秒");
	}
	
	// 使用CAS實現執行緒安全計數器
	private void safeCount(){
		for(;;){
			int i = atomicI.get();
			boolean suc = atomicI.compareAndSet(i, ++i);
			
			if(suc){
				break;
			}
		}
	}
	
	// 非執行緒安全計數器
	private void count(){
		i++;
	}
}

執行結果:

從Java1.5開始JDK的併發包裡提供了一些類來支援原子操作,如AtomicBoolean(用原子方式更新的 boolean 值),AtomicInteger(用原子方式更新的 int 值),AtomicLong(用原子方式更新的 long 值),這些原子包裝類還提供了有用的工具方法,比如以原子的方式將當前值自增1和自減1。

在Java併發包中有一些併發框架也使用了自旋CAS的方式來實現原子操作,比如LinkedTransferQueue類的Xfer方法。CAS雖然很高效的解決原子操作,但是CAS仍然存在三大問題:ABA問題、迴圈時間長開銷大以及只能保證一個共享變數的原子操作。

  • 1.ABA問題

因為CAS需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新。但是如果一個值原來是A,變成了B,後來又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變數前面追加上版本號,每次變數更新的時候把版本號加1,那麼A-B-A 就會變成1A - 2B-3A。

public boolean compareAndSet(
    
    V      expectedReference,  // 預期引用
    
    V      newReference,       // 更新後的引用

    int    expectedStamp,      // 預期標誌
    
    int    newStamp            // 更新後的標誌
)
  • 2.迴圈時間長開銷大

自旋CAS如果長時間不成功,會給CPU帶來非常大的執行開銷。如果JVM能支援處理器提供的pause指令,那麼效率會有一定的提升。pause指令有兩個作用,第一它可以延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它可以避免在退出迴圈的時候因記憶體順序衝突(memory order violation)而引起CPU流水線被清空(CPU pipeline flush),從而提高CPU的執行效率。

  • 3.只能保證一個共享變數的原子操作

當對一個共享變數執行操作時,我們可以使用迴圈CAS的方式來保證原子操作,但是對多個共享變數操作時,迴圈CAS就無法保證操作的原子性。這個時候就可以用,或者有一個取巧的辦法,就是把多個共享變數合併成一個共享變數來操作。比如有兩個共享變數 i=2,j = a,合併一下 ij = 2a,然後用CAS來操作 ij。從Java1.5開始JDK提供了AtomicReference類來保證引用物件之間的原子性,你可以把多個變數放在一個物件裡來進行CAS操作。

2.3.2、使用鎖機制實現原子操作

鎖機制保證了只有獲得鎖的執行緒能夠操作鎖定的記憶體區域。JVM內部實現了很多種鎖機制,有偏向鎖,輕量級鎖和互斥鎖。有意思的是除了偏向鎖,JVM實現鎖的方式都用到的迴圈CAS。當一個執行緒想進入同步塊的時候使用迴圈CAS的方式來獲取鎖,當它退出同步塊的時候使用迴圈CAS釋放鎖。詳細說明可以參見文章Java SE1.6中的Synchronized


三、原子變數

原子變數是類的小工具包,支援在單個變數上解除鎖的執行緒安全程式設計。事實上,此包中的類可以將volatile值、欄位和陣列元素的概念擴充套件到那些也提供了原子條件更新操作的類。

類AtomicBoolean、AtomicInteger、AtomicLong和AtomicReferennce的例項各自提供相應型別的單個變數的訪問和更新。每個類也為該型別提供適當的實用工具方法。

類AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray類進一步擴充套件了原子操作,對這些型別的陣列提供了支援。這些類在為其陣列元素提供volatile訪問語義方面也引人注目,這對於普通陣列來說是不受支援的。

核心方法:boolean  compareAndSet(expectedValue, updateValue);

Java.util.concurrent.atomic包下提供瞭如下常用的原子操作類:

1、AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference;

2、AtomicIntegerArray、AtomicLongArray;

3、AtomicMarkableReference;

4、AtomicReferenceArray;

5、AtomicStampedReference。


四、AtomicInteger原始碼解析

這裡以AtomicInteger原始碼為例進行解析,看其內部如何實現的CAS演算法來保證原子性的。

先挑幾個比較重要的方法出來進行單獨講解:

  • getAndIncrement()方法
public final int getAndIncrement() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return current;
    }
}

getAndIncrement() 相當於int型別中的 i++ 操作。

這個方法的做法是:先獲取到當前的 value 屬性值,然後將 value 加 1,賦值給一個區域性的 next 變數。然而,這兩步都是非執行緒安全的,但是內部有一個死迴圈,不斷去做compareAndSet操作,直到成功為止。也就是值修改的根本在compareAndSet方法裡面。

其他的幾個方法類似,比如:getAndDecrement()、getAndAdd(int delta)、incrementAndGet()、decrementAndGet()、addAndGet(int delta)。這些方法的核心都在於:不斷去做compareAndSet操作,直到成功為止

  • compareAndSet(int expect, int update)
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

CAS演算法:通過對比“valueOffset上的value”與expect是否相同,來決定是否修改value值為update值。

compareAndSet所做的實際上是:呼叫 Sun 的 UnSafe 的 compareAndSwapInt 方法來完成的。此方法為 native 方法,compareAndSwapInt 基於的是CPU 的 CAS指令來實現的。所以基於 CAS 的操作可認為是無阻塞的,一個執行緒的失敗或掛起不會引起其它執行緒也失敗或掛起。並且由於 CAS 操作的是 CPU 原語,所以效能比較好。

  • getAndSet(int newValue)
public final int getAndSet(int newValue) {
    for (;;) {
        int current = get();
        if (compareAndSet(current, newValue))
            return current;
    }
}

getAndSet(int newValue)方法:主要是用來獲取舊值的,不管舊值和預期值是否相等,都會將舊值返回。

  • get()
public final int get() {
    return value;
}

get()方法:主要是用來獲取當前值的。

  • set(int newValue)
public final void set(int newValue) {
    value = newValue;
}

set(int newValue)方法:給value設定一個新值。

  • 完整的AtomicInteger原始碼:
package java.util.concurrent.atomic;
import sun.misc.Unsafe;

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    // 構造器:用initialValue建立一個AtomicInteger
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    public AtomicInteger() {
    }

    // 獲取當前的value值
    public final int get() {
        return value;
    }

    // 給value設定一個新值
    public final void set(int newValue) {
        value = newValue;
    }

    
    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }

    
	// 獲取舊值
    public final int getAndSet(int newValue) {
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))
                return current;
        }
    }

    /*compareAndSet所做的實際上是:呼叫 Sun 的 UnSafe 的 compareAndSwapInt 方法來完成的,
	此方法為 native 方法,compareAndSwapInt 基於的是CPU 的 CAS指令來實現的。
	所以基於 CAS 的操作可認為是無阻塞的,一個執行緒的失敗或掛起不會引起其它執行緒也失敗或掛起。
	並且由於 CAS 操作是 CPU 原語,所以效能比較好。
	*/
	// CAS演算法:通過對比“valueOffset上的value”與expect是否相同,來決定是否修改value值為update值。
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

   
    public final boolean weakCompareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /*i++:這個方法的做法為先獲取到當前的 value 屬性值,然後將 value 加 1,
	賦值給一個區域性的 next 變數,然而,這兩步都是非執行緒安全的,但是內部有一個死迴圈,
	不斷去做compareAndSet操作,直到成功為止,也就是修改的根本在compareAndSet方法裡面
	*/
    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    // 自減 i--
    public final int getAndDecrement() {
        for (;;) {
            int current = get();
            int next = current - 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    // 加法
    public final int getAndAdd(int delta) {
        for (;;) {
            int current = get();
            int next = current + delta;
            if (compareAndSet(current, next))
                return current;
        }
    }

    // ++i
	public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

    // --i
    public final int decrementAndGet() {
        for (;;) {
            int current = get();
            int next = current - 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

    
    public final int addAndGet(int delta) {
        for (;;) {
            int current = get();
            int next = current + delta;
            if (compareAndSet(current, next))
                return next;
        }
    }

    
    public String toString() {
        return Integer.toString(get());
    }


    public int intValue() {
        return get();
    }

    public long longValue() {
        return (long)get();
    }

    public float floatValue() {
        return (float)get();
    }

    public double doubleValue() {
        return (double)get();
    }

}

五、模擬CAS演算法

/**模擬 CAS 演算法*/
public class TestCompareAndSwap {

	public static void main(String[] args) {
		
		final CompareAndSwap cas = new CompareAndSwap();

		for (int i = 0; i < 10; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					int expectedValue = cas.get();
					boolean b = cas.compareAndSet(expectedValue, (int) (Math.random() * 101));
					System.out.println(b);
				}
			}).start();
		}
	}
}

class CompareAndSwap {
	private int value;

	// 獲取記憶體值
	public synchronized int get() {
		return value;
	}

	// 比較
	public synchronized int compareAndSwap(int expectedValue, int newValue) {
		int oldValue = value;

		if (oldValue == expectedValue) {
			this.value = newValue;
		}

		return oldValue;
	}

	// 設定
	public synchronized boolean compareAndSet(int expectedValue, int newValue) {
		return expectedValue == compareAndSwap(expectedValue, newValue);
	}
}

上一篇:volatile關鍵字詳解:https://blog.csdn.net/pcwl1206/article/details/84881395

參考及推薦:

1、CAS操作:https://blog.csdn.net/a724888/article/details/60871077