1. 程式人生 > >階段二,4,無鎖

階段二,4,無鎖

無鎖:無障礙執行。

實現原理:cas演算法實現 

為什麼要cas

在多執行緒高併發程式設計的時候,最關鍵的問題就是保證臨界區的物件安全訪問。通常是加鎖來處理,其實加鎖的本質是將併發轉化成序列來實現,勢必會影響吞吐量,而且執行緒的數量是有限的,依賴於作業系統,而且執行緒的建立和銷燬帶來的效能損耗是不可忽略掉的,雖然現在基本上都是用執行緒池,來儘可能的降低,執行緒帶來的效能損耗。

對於併發控制而言,鎖是一種悲觀策略,會阻塞執行緒執行。而無鎖是一種樂觀策略,他會假設對資源訪問時沒有衝突,既然沒有

衝突就不需要等待。執行緒就會阻塞。那麼多執行緒訪問臨界區資源怎麼處理?無鎖策略採用了一種比較交換技術CAS(compare and swap),來鑑定執行緒衝突,一旦檢測到衝突,就充實當前操作指導沒有衝突為至。

與鎖相比,CAS會使程式設計比較負責,但是由於其優越的效能優勢,以及先天免疫死鎖(根本就沒有鎖,當然就不會有執行緒一直在堵塞了),更為重要的是,使用無鎖的方式沒有鎖競爭帶來的開銷,也沒有執行緒頻繁排程帶來的開銷,他比基於鎖方式更有優越的效能,所以被目前廣泛應用,我們在設計程式時也可適當的使用。

不過cas的編碼確實稍微複雜,而且jdk作者本身也不希望你直接使用unsafe進行程式碼的編寫,所以如果不能深刻理解cas以及unsafe,還要謹慎使用

cas原理分析

cas演算法

一個cas方法包括三個引數CAS(V,E,N。v表示要更新的變數,e表示預期的變數,n表示新值,只有當v等於e時,才能將v更新為n。如果v的值不等於e,說明已被其他執行緒修改,當前執行緒可以放棄此操作,也可以在此嘗試此操作直到修改成功。基於這樣的演算法,cas即使沒有鎖,也可以發現其他執行緒對當前執行緒的干擾(臨界區值的修改),並進行恰當的處理

cpu併發特性cas,volatile

1.cas:比較和替換是設計併發演算法時用到的一種技術。簡單來說,比較和替換是使用一個期望值和一個變數的當前值進行比較,如果當前變數的值與我們期望的值相等,就使用一個新值替換當前變數的值。cas是一種系統原語(所謂原語屬於作業系統用於範圍,原語由若干指令組成,用於完成一定功能的過程。原語的執行必須是連續的,在執行過程中不允許中斷)。

在X86平臺上,CPU提供了在指令執行時期對匯流排加鎖的手段。CPU晶片上有一條引線#HLOCKpin,如果組合語言的程式中在與一條指令前加上字首“LOCK”,經過彙編後以後的機器程式碼就使CPU在執行這條指令的時候把#HLOCK pin的電位拉低,持續到這條指令結束時放開,從而把匯流排鎖住,這樣同一總線上的cpu就暫時不能通過匯流排訪問記憶體了,保證了這條指令在多核處理器環境中的原子性。

AtomicInteger

初次接觸CAS的人一般都是通過AtomicInteger這個類來了解的,這裡講其原理也藉助這個類。

AtomicInteger的原始碼:

private volatile int value;

//此處省略一萬字程式碼

/**

 * Atomically setsto the given value and returns the old value.

 *

 * @param newValuethe new value

 * @return theprevious value

 */

public final int getAndSet(int newValue) {

    for (;;) {

        int current= get();

        if(compareAndSet(current, newValue))

            returncurrent;

    }

}

/**

 * Atomically setsthe value to the given updated value

 * if the currentvalue {@code ==} the expected value.

 *

 * @param expectthe expected value

 * @param updatethe new value

 * @return true ifsuccessful. False return indicates that

 * the actual valuewas not equal to the expected value.

 */

public final boolean compareAndSet(int expect, intupdate) {

    returnunsafe.compareAndSwapInt(this, valueOffset, expect, update);

}

通過這段程式碼可知

-AutomicInteger中真正儲存資料的是value變數,而改變變數的是被volatile修飾的,保證執行緒直接的可見性。還記的

Integer中的value值嗎?Integerzh中的value是被final修飾的,是不可變物件。

-getAndSet方法通過一個死迴圈不斷嘗試賦值操作。而真正的賦值操作交給了unsafe類來實現。

AutomaticInteger的使用

在java語言中,++i和i++並不是執行緒安全的,在使用的時候,不可避免的會用到synchronized關鍵字,而automicInteger

則通過一種執行緒安全加減操作介面。

package TestAtomicInteger;
 
import java.util.concurrent.atomic.AtomicInteger;
 
class MyThread implements Runnable {
//    static  int i = 0;
     static AtomicInteger ai=new AtomicInteger(0);
      
 
    public void run() {
        for (int m = 0; m < 1000000; m++) {
            ai.getAndIncrement();
        }
    }
};
 
public class TestAtomicInteger {
    public static void main(String[] args) throws InterruptedException {
        MyThread mt = new MyThread();
 
        Thread t1 = new Thread(mt);
        Thread t2 = new Thread(mt);
        t1.start();
        t2.start();
        Thread.sleep(500);
        System.out.println(MyThread.ai.get());
    }
}
可以發現的結果都是2000000,也就是說AutomaticInteger是執行緒安全的。

值得一看的。這裡,我們來看看AtomicInteger是如何使用非阻塞演算法來實現併發控制的。

AtomicInteger 的關鍵域只有三個:

// setup to use Unsafe.compareAndSwapInt for updates  
private static final Unsafe unsafe = Unsafe.getUnsafe();  
private static final long valueOffset;  
private volatile int value; 

這裡,unsafe是java提供的獲得物件記憶體地址訪問的類,他的作用就是在更新操作時提供“比較並替換”的作用。實際上就是

AutomicInteger中的一個工具。valueOffset是用來記錄value本身在記憶體的編譯地址的。方便比較。

value值是用來儲存整數的時間變數, 這裡被宣告為volatile,就是為了保證在更新操作時,當前執行緒可以拿到value的最新值(

併發環境下,value可能已經被其他執行緒更新了)

我們自增程式碼為例,可以看到這個併發控制的核心演算法:

/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
//這裡可以拿到value的最新值
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}

public final boolean compareAndSet(int expect, int update) {
//使用unsafe的native方法,實現高效的硬體級別CAS
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

Unsafe

上面可知,unsafe類是cas的核心。

從名字可知,這個類標記為不安全的,jdk作者不希望使用者使用這個類,我們看一下他的構造方法。

public static Unsafe getUnsafe() {

Classvar0 = Reflection.getCallerClass();

if(var0.getClassLoader() != null) {

thrownew SecurityException("Unsafe");

} else {

returntheUnsafe;

}

}

如果ClassLoader不是null,直接丟擲異常了,我們沒辦法在應用程式中使用這個類

public static void main(String[] args){

Unsafeunsafe = Unsafe.getUnsafe();

}

main方法執行結構

Exception in thread "main"java.lang.SecurityException: Unsafe

atsun.misc.Unsafe.getUnsafe(Unsafe.java:90)

atcom.le.luffi.Tewast.main(Tewast.java:13)

atsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

atjava.lang.reflect.Method.invoke(Method.java:606)

atcom.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

我們在看一下compareAndSet的方法宣告:

public final native booleancompareAndSwapInt(Object var1, long var2, int var4, int var5);

第一個引數是給定的物件,offset是物件內的偏移量(其實就是一個欄位到物件頭部的偏移量,通過這個偏移量可以快

速定位欄位),第三個引數是 期望值,最後一個是要設定的值。

其中unsafe封裝了一些類似於c++中指標的東西,該類的方法都是native的,而且是原子的操作。原子性是通過cas指令

實現的,由處理器保證。

unsafe類,不屬於java標準,但是很多java的基礎類庫,包括一些被廣泛使用的高效能開發都是基於Unsafe類開發的,比

如netty,cassandra,hadoop,kafka等,unsafe類在提升java執行效率,增強java語言底層操作能力方面起到了很大的作用

 unsafe類使java擁有了想c語言的指標一樣的操作能力,同時也帶來了指標問題。過度使用Unsafe類會使出錯的機率變大

,因此java9之後去掉了unsafe類。

 unsafe類使用了單例模式,需要一個靜態方法getUnsafe()來獲取。但是Unsafe類做了限制,如果普通呼叫的話,它會

丟擲SecurityException異常,只有由主類載入器載入的類才能呼叫這個方法,其原始碼如下:

public static Unsafe getUnsafe() {
    Class var0 = Reflection.getCallerClass();
    if(!VM.isSystemDomainLoader(var0.getClassLoader())) {
        throw new SecurityException("Unsafe");
    } else {
        return theUnsafe;
    }
}

AtomicReference介紹

 atomicReference是作用於物件進行的原子性操作。 在jdk7中AtomicReference原始碼如下:
dsf
public class AtomicReference<V>  implements java.io.Serializable {
    private static final long serialVersionUID = -1848883965231344442L;

    // 獲取Unsafe物件,Unsafe的作用是提供CAS操作
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicReference.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }

    // volatile型別
    private volatile V value;

    public AtomicReference(V initialValue) {
        value = initialValue;
    }

    public AtomicReference() {
    }

    public final V get() {
        return value;
    }

    public final void set(V newValue) {
        value = newValue;
    }

    public final void lazySet(V newValue) {
        unsafe.putOrderedObject(this, valueOffset, newValue);
    }

    public final boolean compareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }

    public final boolean weakCompareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }

    public final V getAndSet(V newValue) {
        while (true) {
            V x = get();
            if (compareAndSet(x, newValue))
                return x;
        }
    }

    public String toString() {
        return String.valueOf(get());
    }
}<span style="font-family: 'Courier New' !important; font-size: 12px !important; line-height: 1.5 !important; color: rgb(0, 0, 0);"></span>
說明:AtomicReference的原始碼比較簡單,他通過“volicate”和“unsafe”提供CAS函式實現“原子操作”(01)value 是volatile型別。這保證了:當某執行緒修改value值時,其他執行緒看到的value值都是最新的value值。(02)通過CAS設定value。這保證了:當某執行緒池通過CAS函式(如compareAndSet函式)設定value時,他操作是原子的,即執行緒在操作value時不會被中斷。

AtomicReference示例

// AtomicReferenceTest.java的原始碼  
import java.util.concurrent.atomic.AtomicReference;  
  
public class AtomicReferenceTest {  
      
    public static void main(String[] args){  
  
        // 建立兩個Person物件,它們的id分別是101和102。  
        Person p1 = new Person(101);  
        Person p2 = new Person(102);  
        // 新建AtomicReference物件,初始化它的值為p1物件  
        AtomicReference ar = new AtomicReference(p1);  
        // 通過CAS設定ar。如果ar的值為p1的話,則將其設定為p2。  
        ar.compareAndSet(p1, p2);  
  
        Person p3 = (Person)ar.get();  
        System.out.println("p3 is "+p3);  
        System.out.println("p3.equals(p1)="+p3.equals(p1));  
    }  
}  
  
class Person {  
    volatile long id;  
    public Person(long id) {  
        this.id = id;  
    }  
    public String toString() {  
        return "id:"+id;  
   
執行結果
  1. p3 is id:102
  2. p3.equals(p1)=false
結果說明
新建AtomicReference物件ar時,將它初始化為p1。
緊接著,通過CAS函式對它進行設定。如果ar的值為p1的話,則將其設定為p2。
最後,獲取ar對應的物件,並列印結果。p3.equals(p1)的結果為false,這是因為Person並沒有覆蓋equals()方法,而是採用繼承自Object.java
的equals()方法;而Object.java中的equals()實際上是呼叫"=="去比較兩個物件,即比較兩個物件的地址是否相等。

AtomicStampedReference

 AtomicReference在修改過程中,丟失了狀態資訊。物件值本身與狀態被畫上了等號。因此,我們只要能夠記錄物件在修改過程中的狀態值,就可以很好的解決物件被反覆修改導致執行緒無法正確判斷物件狀態的問題。

AtomicStampedReference正式這麼做的。他內部不僅維護了物件值,還維護了一個時間戳(實際上他可以使任何一個整數,它使用整數來表示狀態值)。當AtomicStampedReference對應的數值被修改時,除了更新資料本身外,他必須要更新時間戳。當AtomicStampedReference設定物件時,物件值以及時間戳都必須滿足期望值,寫入才會成功。因此,即使物件值被反覆讀寫,寫回原值,只要時間戳發生變化,就能防止不恰當的寫入。

AtomicStampedReference的幾個API在AtomicReference的基礎上新填了有關時間戳的資訊:

//比較設定 引數依次為:期望值 寫入新值 期望時間戳 新時間戳
public boolean compareAndSet(V expectedReference,V  
newReference,int expectedStamp,int newStamp)
//獲得當前物件引用
public V getReference()
//獲得當前時間戳
public int getStamp()
//設定當前物件引用和時間戳
public void set(V newReference, int newStamp)
使用AtomicStampedReference在修正那個貴賓卡充值的問題的:
 public class AtomicStampedReferenceDemo {
 static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19,0);
   public static void main(String[] args) {
       //模擬多個執行緒同時更新後臺資料庫,為使用者充值
        for(int i = 0 ; i < 3 ; i++) {
            final int timestamp=money.getStamp();
            newThread() {  
               public void run() { 
                   while(true){
                       while(true){
                            Integerm=money.getReference();
                             if(m<20){
                         if(money.compareAndSet(m,m+20,timestamp,timestamp+1)){
          System.out.println("餘額小於20元,充值成功,餘額:"+money.getReference()+"元");
                                    break;
                                }
                            }else{
                               //System.out.println("餘額大於20元,無需充值");
                                break ;
                            }
                       }
                    }
                } 
            }.start();
         }
        
        //使用者消費執行緒,模擬消費行為
        new Thread() { 
             publicvoid run() { 
                for(int i=0;i<100;i++){
                   while(true){
                       int timestamp=money.getStamp();
                        Integer m=money.getReference();
                        if(m>10){
                            System.out.println("大於10元");
                          if(money.compareAndSet(m, m-10,timestamp,timestamp+1)){
                       System.out.println("成功消費10元,餘額:"+money.getReference());
                                 break;
                             }
                       }else{
                            System.out.println("沒有足夠的金額");
                             break;
                       }
                   }
                   try {Thread.sleep(100);} catch (InterruptedException e) {}
                }
            } 
       }.start(); 
    }
}
執行上述程式碼,可以得到以下輸出:
餘額小於20元,充值成功,餘額:39元
大於10元
成功消費10元,餘額:29
大於10元
成功消費10元,餘額:19
大於10元
成功消費10元,餘額:9
沒有足夠的金額
可以看到,賬戶只被贈予了一次。

AtomicIntegerArray

當前可使用的原子陣列有:AtomicIntegerArray,AtomicLongArray,和AtomicReferenceArray,分別表示整數陣列,long型陣列和普通的物件陣列。

    以AtomicIntegerArray為例,展示原子陣列的使用方式。

AtomicIntegerArray本質上是對int[]型別的封裝。使用Unsafe類通過CAS的方式控制int[]在多執行緒下的安全性。它提供了以下幾個核心API:
//獲得陣列第i個下標的元素
public final int get(int i)
//獲得陣列的長度
public final int length()
//將陣列第i個下標設定為newValue,並返回舊的值
public final int getAndSet(int i, int newValue)
//進行CAS操作,如果第i個下標的元素等於expect,則設定為update,設定成功返回true
public final boolean compareAndSet(int i, int expect, intupdate)
//將第i個下標的元素加1
public final int getAndIncrement(int i)
//將第i個下標的元素減1
public final int getAndDecrement(int i)
//將第i個下標的元素增加delta(delta可以是負數)
public final int getAndAdd(int i, int delta)

簡單的示例,展示AtomicIntegerArray使用:

public class AtomicIntegerArrayDemo {
	 static AtomicIntegerArray arr = new AtomicIntegerArray(10);

	 
	      public static class AddThread implements Runnable{
	          public void run(){
	             for(int k=0;k<10000;k++)
	                  arr.getAndIncrement(k%arr.length());
	          }
	      }
	     public static void main(String[] args) throws InterruptedException {
	          Thread[]ts=new Thread[10];
	          for(int k=0;k<10;k++){
	             ts[k]=new Thread(new AddThread());
	          }
	          for(int k=0;k<10;k++){ts[k].start();}
	          for(int k=0;k<10;k++){ts[k].join();}
	          System.out.println(arr);
	     }
}

上述程式碼第2行,申明瞭一個內含10個元素的陣列。第3行定義的執行緒對陣列內10個元素進行累加操作,每個元素各加1000次。第11行,開啟10個這樣的執行緒。因此,可以預測,如果執行緒安全,陣列內10個元素的值必然都是10000。反之,如果執行緒不安全,則部分或者全部數值會小於10000。   

程式的輸出結果如下: 

[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000,10000, 10000]

這說明AtomicIntegerArray確實合理地保證了陣列的執行緒安全性。

AtomicIntegerFieldUpdater

對於atomicIntegerFieldUpdater的使用稍微有一些約束,約束如下:

(1)欄位必須是volatile型別的,線上程之間共享 變數是保證立即可見性eg:volatile int value = 3

(2)欄位的描述型別(修飾符public/protected/default/peivate)是與呼叫者與操作物件欄位的關係一致。也就是說呼叫者能夠直接操作物件欄位,那麼就可以反射進行原子操作。但是對於父類的欄位,子類是不能直接操作的,儘管子類可以訪問父類的欄位。

(3)只能是例項變數,不能是類變數,也就是說不能加static關鍵字。

(4)只能是可修改變數,不能是final變數,因為final的語義就是不可修改。實際上final的語義和volatile是有衝突的,這兩個關鍵字不能同時存在。

(5)對於AtomicIntegerFieldUpdater和AtomicLongFieldUpdate只能修改int/long型別的欄位,不能修改其包裝型別(integer/long)。如果要修改包裝型別就需要使用AtomicReferenceFieldUpdater。

 public class TestAtomicIntegerFieldUpdater {

        public static void main(String[] args){
            TestAtomicIntegerFieldUpdater tIA = new TestAtomicIntegerFieldUpdater();
            tIA.doIt();
        }

        public AtomicIntegerFieldUpdater<DataDemo> updater(String name){
            return AtomicIntegerFieldUpdater.newUpdater(DataDemo.class,name);

        }

        public void doIt(){
            DataDemo data = new DataDemo();
            System.out.println("publicVar = "+updater("publicVar").getAndAdd(data, 2));
        }

    }

    class DataDemo{
        public volatile int publicVar=3;
        protected volatile int protectedVar=4;
        private volatile  int privateVar=5;

        public volatile static int staticVar = 10;
        //public  final int finalVar = 11;

        public volatile Integer integerVar = 19;
        public volatile Long longVar = 18L;

    }