1. 程式人生 > 其它 >樂觀鎖-無鎖併發

樂觀鎖-無鎖併發

技術標籤:多執行緒java多執行緒

文章目錄

無鎖

問題提出

一個賬戶內有10000,1000個執行緒每個執行緒減去10,最後正確的結果應該是0。

public interface Account {
    Integer getAmount();
    void withdraw(Integer amount);

    public static void demo(Account account)
{ ArrayList<Thread> threadList = new ArrayList<>(); long start = System.nanoTime(); for(int i = 0;i < 1000; i++){ Thread t = new Thread(()->{ account.withdraw(10); }); threadList.add(t); } threadList.
forEach(Thread ::start); //遍歷1000個執行緒,並呼叫start方法啟動 threadList.forEach(t->{ try { t.join(); //保證1000個執行緒在主執行緒之前執行完畢以確保執行時間的正確 } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime
(); System.out.println(account.getAmount()+"cost:"+TimeUnit.NANOSECONDS.toMillis(end-start)+"ms"); } }
為什麼不安全
class SafeAccount implements Account{
    private Integer amount;
    public  SafeAccount(Integer amount) {
        this.amount = amount;
    }
    @Override
    public Integer getAmount() {
        return amount;
    }
    @Override
    public  void withdraw(Integer amount) {
        this.amount -= amount;
    }

    public static void main(String[] args) {
        Account account = new SafeAccount(10000);
        Account.demo(account);
    }
}

image-20210121205935443

最後結果並不是0,發生錯誤的原因是指令交錯的原因造成的,比如執行緒一獲得amout開始-10操作時另一個執行緒也開始同樣的操作,使得結果發生錯誤。

解決思路-鎖

使用synchronized關鍵字保護共享變數amount,當一個執行緒完成了-10操作並且釋放了鎖其他執行緒才能執行,讓執行緒之間序列執行。這樣不會造成指令交錯,確保了操作的原子性

public synchronized void withdraw(Integer amount) {	
    this.amount -= amount;
}
解決思路-無鎖
class UnlockAccount implements Account{
    private AtomicInteger balance;	//使用原子整數
    publicnlockAccount(Integer amount) {
        balance = new AtomicInteger(amount);
    }
    @Override
    public Integer getAmount() {
        return balance.get();
    }
    @Override
    public void withdraw(Integer amount) {
        while (true){
            int prev = balance.get();	//獲取舊值
            int next = except - amount;	//操作後的新值
          //將舊值prev與此時的最新值比較如果相等則將最新設定為next返回true並結束迴圈,如果不相等就返回false繼續嘗試此操作直到成功。
            if (balance.compareAndSet(prev,next))   
                break;
        }
    }
    public static void main(String[] args) {
        Account account = new unlockAccount(10000);
        Account.demo(account);
    }
}

image-20210121215443270

無鎖方式可以允許指令的交錯,但是當發生指令交錯後會讓這次操作失效,繼續嘗試直到成功。

CAS與volatile

CAS

使用原子整數在不加鎖的情況下可以實現對共享變數安全的操作。

以下是AtomicInteger的部分原始碼

public class AtomicInteger extends Number implements java.io.Serializable {
      public AtomicInteger(int initialValue) {
        value = initialValue;	//給value賦初始值
    }
 //獲取Unsafe物件,提供了非常底層的,操作執行緒,記憶體的方法。
  private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
  //獲得value屬性在AtomicInteger中的偏移量,用來定位value的地址,直接操作它。
  valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
  //將value設定為volatile確保執行緒間的可見性
  private volatile int value;
  public final boolean compareAndSet(int expect, int update) {
    /*通過vaueOffet定位到value並且比較和舊值expect是否相等。
    如果相等將value值設定為update
    如果不等表示這次操作失敗,返回false。
     */ 
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update); //這個操作是原子性的,不會被其他執行緒干擾
    }
}

其中關鍵的就是compareAndSet,它的簡稱就是CAS,它屬於原子操作。CAS的底層是lock cmpxchg指令(X86架構),在單核和多核CPU下都能保證【比較-交換】的原子性。

volatile

獲取共享變數時為了保證變數的可見性需要使用volatile修飾,保證一個執行緒對變數的修改另一個執行緒可見。CAS操作必須藉助volatile才能讀取共享變數的最新值保證【比較-交換】的有效性。

為什麼無鎖效率高
  • 當cas操作不成功時使用while(true)迴圈不斷嘗試,該執行緒一直處於running狀態不會停歇。而synchronized會讓沒獲得鎖的執行緒陷入等待,發生上下文切換。
  • 無鎖情況下要保持執行緒不斷的執行需要多個cpu的支援,如果cpu數小於執行緒數,雖然不會陷入阻塞,但由於沒有分到時間片,仍然會進入Runnable狀態,還是會導致上下文切換。所以在多核cpu下才由效果(執行緒數<cpu數)。

樂觀鎖和悲觀鎖

  • CAS基於樂觀鎖的思想:最客觀的估計,在當前執行緒執行時不怕別的執行緒來修改共享變數,即使修該了也沒關係,可以再繼續重試
  • synchronized 基於悲觀鎖的思想:最悲觀的估計,在我執行時就上鎖不給其他執行緒修改共享變數的機會,只有我執行完解鎖,其他執行緒才可以修改共享變數。
  • CAS體現的是無鎖併發無阻塞併發
    • 沒有使用synchronized,執行緒不會陷入阻塞,這時效率提升的因素之一。
    • 但是如果競爭激烈,重試頻繁發生,反而影響效率。

原子類

1、原子引用類
  • AtomicReference
  • AtomicMarkableReference 給共享變數一個標記,記錄是否被修改過
  • AtomicStampedReference 給共享變數附加一個版本號,記錄被修改過的次數
2、ABA

多個執行緒將共享變數的值由A–>B,再由B–>A。其他執行緒在操作時不會知道共享變數是否被修改過。

static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {
    String prev = ref.get();
    other();
    Thread.sleep(2000);
    log.debug("change A->B{}",ref.compareAndSet(prev,"B"));
}
public static void other(){
    new Thread(()->{
        log.debug("change A->B {}",ref.compareAndSet(ref.get(),"B"));
    },"t1").start();

    new Thread(()->{
        log.debug("change B->A {}",ref.compareAndSet(ref.get(),"A"));
    },"t2").start();
}

image-20210127155531103

如果主執行緒希望只要其他執行緒修改了共享變數,那麼就算自己CAS失敗,這樣僅比較值是不夠的,需要再加一個版本號。

AtomicStampedReference

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
    String prev = ref.getReference();
    int stamp = ref.getStamp();
    log.debug("start:{},stamp:{}",prev,stamp);
    other();
    Thread.sleep(2000);
    log.debug("change A->B{}",ref.compareAndSet(prev,"B",stamp,ref.getStamp()+1));
}
public static void other(){
    new Thread(()->{
        log.debug("change A->B {},stamp:{}",ref.compareAndSet(ref.getReference(),"B",ref.getStamp(),ref.getStamp()+1),ref.getStamp());
    },"t1").start();

    new Thread(()->{
        log.debug("change B->A {},stamp:{}",ref.compareAndSet(ref.getReference(),"A",ref.getStamp(),ref.getStamp()+1),ref.getStamp());
    },"t2").start();
}

image-20210127160005417

如果只關心是否被修改過,並不關係修改過幾次,可以使用

AtomicMarkableReference

static AtomicMarkableReference<String> ref = new AtomicMarkableReference<>("A",false);	//初始值設為false,即沒有被修改過。
public static void main(String[] args) throws InterruptedException {
    String prev = ref.getReference();
    other();
    Thread.sleep(2000);
    log.debug("change A->B {}",ref.compareAndSet(prev,"B",false,true));
}
public static void other(){
    new Thread(()->{
        log.debug("change A->B {}",ref.compareAndSet(ref.getReference(),"B",false,true));
    },"t1").start();

    new Thread(()->{
        log.debug("change B->A {}",ref.compareAndSet(ref.getReference(),"A",true,true));
    },"t2").start();
}

image-20210127160642893

3、原子陣列
/*10個執行緒對長度為10的陣列的每個元素分別進行1000次累加操作,執行緒安全的情況下陣列每個元素值是1000
引數1,提供陣列。原子陣列或普通陣列
引數2,獲得陣列長度的方法
引數3,自增方法,回傳array,index
引數4,列印陣列的方法
*/
public class SafeArray {
    public static <T> void demo(
            Supplier<T> arraySupplier,
            Function<T, Integer> lenFun,
            BiConsumer<T, Integer> putConsumer,
            Consumer<T> printConsumer
    ) {
        ArrayList<Thread> threadList = new ArrayList<>();
        T array =  arraySupplier.get();
        int len = lenFun.apply(array);
        for(int i = 0; i < len; i++){
            threadList.add(new Thread(()->{
                for(int j = 0; j < 1000; j++) {
                    putConsumer.accept(array,j%len);
                }
            }));
        }

        threadList.forEach(Thread::start);//啟動所有的執行緒
        threadList.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        printConsumer.accept(array);
    }

}
public static void main(String[] args) {
    SafeArray.demo(
            ()-> new AtomicIntegerArray(10),
            (array)->array.length(),
            (array,index)->array.getAndIncrement(index),	//累加操作
            (array) -> System.out.println(array)
    );
}

image-20210127183035497

Unsafe

Unsafe物件提供了非常底層的,操作記憶體,執行緒的方法,不能直接呼叫,只能通過反射獲取。

自定義一個原子類:

class AtomicData{

    //使用volatile來保證可見性
    private volatile Integer data;
    private static Unsafe unsafe;
    //data屬性的偏移量,Unsafe可以通過偏移地址直接操作data
    final static long OFFSET;

    public AtomicData(Integer data) {
        this.data = data;
    }

    static {
        unsafe =  UnsafeAccessor.getUnsafe();
        try {
            OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }

    public int getAndUpdate(ResultOperator resultOperator){
        int prev,update;
        do{
            prev = get();
            update = resultOperator.applyAsInt(prev);
            System.out.println(update);
        }while (!(unsafe.compareAndSwapInt(this,OFFSET,prev,update)));
        return prev;
      
    }
    public final int get(){
        return data;
    }

}