1. 程式人生 > 其它 >併發基礎知識補全和CAS基本原理

併發基礎知識補全和CAS基本原理

技術標籤:Javajava併發程式設計多執行緒

併發基礎知識補全

Callable、Future和FutureTask

在前文(執行緒基礎、執行緒之間的共享與協作)中提到過中,新啟執行緒的方式只有兩種,一種就是擴充套件自Thread類,然後重寫run()方法,另一種就是實現Runnable介面,實現run()方法。

那麼Callable介面這種方式,又是怎麼回事呢。我們先來觀察Thread類中的構造方法,並沒有可以接受一個callable這種引數的構造方法。我們使用Callable的時候,首先要把它包裝成FutureTask,而它又實現了RunnableFuture介面。

public
class FutureTask<V> implements RunnableFuture<V> { public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
public void run() { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); }catch (Throwable ex) {} } } }

RunnableFuture介面實際上又是繼承了RunnableFuture介面。也就是說到底,Callable

交給執行緒去執行的時候,實際上還是包裝成了Runnable交給執行緒去執行。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

阻塞方法sleep()和wait()的區別

之前提到執行緒生命週期的時候,無論是sleep還是wait(),都會進入阻塞狀態,但是如果細分的話,雖然兩者都會暫停執行緒的執行,實際上兩者進入的執行緒狀態是不一樣的。
我們先來看sleep()的原始碼:

/**
     * Causes the currently executing thread to sleep (temporarily cease
     * execution) for the specified number of milliseconds, subject to
     * the precision and accuracy of system timers and schedulers. The thread
     * does not lose ownership of any monitors.
     */
    public static native void sleep(long millis) throws InterruptedException;

sleep()方法是Thread類中的一個native方法,它會在指定的時間內阻塞執行緒的執行。而且從其註釋中可知,並不會失去對任何監視器(monitors)的所有權,也就是說不會釋放鎖,僅僅會讓出cpu的執行權。

我們再來看wait()的原始碼:
無論是wait(),還是wait(long timeout, int nanos)走的都是native的wait()方法。

/**
* This method should only be called by a thread that is the owner
* of this object's monitor. See the {@code notify} method for a
* description of the ways in which a thread can become the owner of
* a monitor.
*/
public final native void wait(long timeout) throws InterruptedException;

根據註釋可以看出,此方法呼叫的前提是當前執行緒已經獲取了物件監視器monitor的所有權。

該方法會呼叫後不僅會讓出cpu的執行權,還會釋放鎖(即monitor的所有權),並且進入wait set中,直到其他執行緒呼叫notify()或者notifyall()方法,或者指定的timeout到了,才會從wait set中出來,並重新競爭鎖。

區別
兩者最主要的區別就是釋放鎖(monitor的所有權)與否,但是兩個方法都會丟擲InterruptedException。

執行緒阻塞BLOCKED和等待WAITING的區別

阻塞BLOCKED:
阻塞表示執行緒在等待物件的monitor鎖,試圖通過synchronized去獲取某個鎖,但是此時其他執行緒已經獨佔了monitor鎖,那麼當前執行緒就會進入等待狀態。

等待WAITING
當前執行緒等待其他執行緒執行某些操作,典型場景就是生產者消費者模式,在任務條件不滿足時,等待其他執行緒的操作從而使得條件滿足。可以通過wait()方法或者Thread.join()方法都會使執行緒進入等待狀態。

實際上不用可以區分兩者, 因為兩者都會暫停執行緒的執行。兩者的區別是: 進入WAITING狀態是執行緒主動的, 而進入BLOCKED狀態是被動的。更進一步的說, 進入BLOCKED狀態是在同步(synchronized程式碼之外), 而進入WAITING狀態是在同步程式碼之內。

例如:

synchronized(obj){
  obj.wait()
}

在這個同步程式碼塊中,我們通過synchronize關鍵字去獲取obj物件的同步鎖,如果沒有獲取到,這時候被動就會進入BLOCKED狀態。直到獲取到了鎖,從阻塞狀態進入就緒/執行狀態,然後呼叫obj.wait(),主動進入WAITING狀態進如狀態,直到其他執行緒在同步程式碼塊中呼叫了obj.notify()/obj.notifyAll(),又會從WAITING狀態進入進入就緒/執行狀態。

死鎖

死鎖是指兩個或兩個以上的程序在執行過程中,由於競爭資源或者由於彼此通訊而造成的一種阻塞的現象,若無外力的作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖。

來看這個例子:
小明和小王都去買餃子皮和餡料,但是它們都各自只有一份,這時候小明搶到了餃子皮,小王搶到了餡料,但他們都各自不肯鬆手,但是隻有一份原料,又都包不成餃子。所以兩個人都只能僵持著,這就是所謂的死鎖。

所以,死鎖有這些特點:

  • 多個操作者(M>=2),爭奪多個資源(N>=2),且N<=M
  • 爭奪資源的順序不對
  • 拿到資源後不放手

用凝練一點的語言來描述,那就是:

  1. 互斥:一個時間同一個資源只能由一個程序持有
  2. 持有並等待:程序保持至少一個資源,並等待其他程序持有的額外資源
  3. 不剝奪:程序持有資源之後不會被其他程序剝奪
  4. 迴圈等待:程序互相等待各自的資源

我們用一段程式碼來進行死鎖的演示:

/**
 * 類說明:演示死鎖的產生
 */
public class DeadLock {

    private static Object lock1 = new Object();//第一個鎖
    private static Object lock2 = new Object();//第二個鎖

    //第一個拿鎖的方法
    private static void firstDo() throws InterruptedException {
        String threadName = Thread.currentThread().getName();
        synchronized (lock1) {
            System.out.println(threadName + " get lock-1");
            Thread.sleep(100);
            synchronized (lock2) {
                System.out.println(threadName + " get lock-2");
            }
        }
    }

    //第二個拿鎖的方法
    private static void secondDo() throws InterruptedException {
        String threadName = Thread.currentThread().getName();
        synchronized (lock2) {
            System.out.println(threadName + " get lock-2");
            Thread.sleep(100);
            synchronized (lock1) {
                System.out.println(threadName + " get lock-1");
            }
        }
    }

    private static class TestThread extends Thread {

        private String name;

        public TestThread(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            Thread.currentThread().setName(name);
            try {
                firstDo();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread.currentThread().setName("Thread A");
        TestThread testThread = new TestThread("Thread 2");
        testThread.start();
        secondDo();
    }
}

在實際開發過程中,我們應避免死鎖的產生,因為死鎖會進入一個等待的狀態,並不會丟擲異常,不利於我們查詢錯誤。那麼怎麼來避免死鎖呢?

由於多個操作者來爭搶多個資源,這是由業務邏輯來決定的,所以我們一般從後者入手,比如以正確的順序去爭奪資源,或者拿到資源後允許放手,都可以解決死鎖。

ReentrantLock中的tryLock()就可以用於死鎖的解決。它可以對顯式鎖嘗試獲取,並返回boolean值。程式碼段可以這麼寫

  if (lock.tryLock()) {
      try {
          //TODO
      } finally {
          lock.unlock();
      }
  } else {
      // perform alternative actions
  }

活鎖

在上文中,我們使用tryLock()來解決死鎖,但是不正當使用的,有可能會造成活鎖的產生。來看這麼一段程式碼:


/**
 *類說明:演示嘗試拿鎖解決死鎖
 */
public class TryLock {
    private static Lock lock1 = new ReentrantLock();//第一個鎖
    private static Lock lock2 = new ReentrantLock();//第二個鎖

    //先嚐試拿lock1 鎖,再嘗試拿lock2鎖,lock1鎖沒拿到,連同lock2鎖一起釋放掉
    private static void firstToSecond() throws InterruptedException {
        String threadName = Thread.currentThread().getName();
        Random r = new Random();
        while(true){
            if(lock1.tryLock()){
                System.out.println(threadName +" get lock1");
                try{
                    if(lock2.tryLock()){
                        try{
                            System.out.println(threadName +" get lock2");
                            System.out.println("firstToSecond do work------------");
                            break;
                        }finally{
                            System.out.println(threadName +" release lock2");
                            lock2.unlock();
                        }
                    }
                }finally {
                    System.out.println(threadName +" release lock1");
                    lock1.unlock();
                }

            }
            //Thread.sleep(r.nextInt(3));
        }
    }

    //先嚐試拿lock2 鎖,再嘗試拿lock1鎖,lock2鎖沒拿到,連同lock1鎖一起釋放掉
    private static void SecondToFirst() throws InterruptedException {
        String threadName = Thread.currentThread().getName();
        Random r = new Random();
        while(true){
            if(lock2.tryLock()){
                System.out.println(threadName +" get lock2");
                try{
                    if(lock1.tryLock()){
                        try{
                            System.out.println(threadName +" get lock1");
                            System.out.println("SecondToFirst do work------------");
                            break;
                        }finally{
                            System.out.println(threadName +" release lock1");
                            lock1.unlock();
                        }
                    }
                }finally {
                    System.out.println(threadName +" release lock2");
                    lock2.unlock();
                }

            }
            //Thread.sleep(r.nextInt(3));
        }
    }

    private static class TestThread extends Thread{

        private String name;

        public TestThread(String name) {
            this.name = name;
        }

        public void run(){
            Thread.currentThread().setName(name);
            try {
                SecondToFirst();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Thread.currentThread().setName("Thread A");
        TestThread testThread = new TestThread("Thread B");
        testThread.start();
        try {
            firstToSecond();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在我們沒有加上Thread.sleep(r.nextInt(3));之前,執行緒之間的相互等待的過程可能會被急劇拉長。為什麼呢?比如馬路中間有條小橋,只能容納一輛車經過,橋兩
頭開來兩輛車A和B,A比較禮貌,示意B先過,B也比較禮貌,示意A先過,結果兩人一直謙讓誰也過不去。

在上面的程式碼中,有執行緒A、B,分別去獲取鎖1和2

  1. A先競爭到1,然後嘗試去競爭2
  2. B先競爭到2,然後嘗試去競爭1
  3. A釋放鎖1,B釋放鎖2
  4. A再競爭到1,然後嘗試去競爭2
  5. B再競爭到2,然後嘗試去競爭1

這樣一來,A和B執行緒就一直在相互競爭中迴圈等待著,但是跟死活不一樣,進入了死鎖的執行緒,是不進行工作的,而進入活鎖的執行緒,是在忙碌的工作著。而我們加上休眠,可以讓兩個執行緒在競爭鎖的時候,時間錯開一點,避免了活鎖。

CAS基本操作(Compare And Swap)

原子操作

我們都知道,在沒有發現電子、原子核之前,科學界所認識到物質的最小單位就是原子,原子就是不可再分的。那麼反映併發程式設計中,所謂原子操作是指不會被執行緒排程機制打斷的操作;這種操作一旦開始,就一直執行到結束,不會發生上下文的切換。

如何實現原子操作

synchronized包圍的程式碼塊,其實就是一個原子操作。但是,使用synchronized是一個很消耗效能的操作,因為會涉及到執行緒的狀態變化,沒有搶到內建鎖的執行緒,會進入等待佇列裡面並等待。但假如我們的同步程式碼塊裡面只有簡單的i++,那麼使用synchronized是不是就太大題小做了,有沒有一種更輕量級的同步機制呢?

為了解決這個問題,在現代CPU裡面,提供了一種Compare And Swap指令,簡稱CAS

CAS指令原理

Compare And Swap,從名字來看,就是包含比較並交換兩個步驟,但是在CPU提供的CAS指令已經包含這兩個操作了,由CPU去保證這兩個步驟是原子的。意思就是說,比較和交換兩個動作,要麼全都完成,要麼就全都不執行。

那麼CAS指令是如何保證執行緒的同步的呢,我們就拿簡單的i++來看。假如使用synchronized,那就是誰搶到了鎖,誰就去執行i++。那麼在CAS指令中,是怎麼操作的呢。

假如i初始值為0,有A~D四個執行緒,都要執行i++這個操作。首先,四個執行緒都從記憶體裡面取出i=0,然後在自己的方法棧上,進行i++,i變為1,這時候要重新寫回記憶體的時候,同一時間只允許一個執行緒進行操作。假設A執行緒拿到了這個許可權,這時候它會再次從記憶體中取出i的值,如果i==0,則把計算得到的值寫回去,這個執行緒就執行完了,輪到其他執行緒來執行。這時候B執行緒從記憶體中取出i,悲催地發現i已經等於1了,說明i的值已經被人讀寫過了,那麼就應該重新執行i++。其他執行緒也一樣。

所以說,CAS其實就是不斷重複這個指令(自旋),直到成功為止。

在這裡涉及到了悲觀鎖跟樂觀鎖的概念。在使用synchronized同步關鍵字的時候,執行緒會悲觀的認為,總有其他執行緒想來害它自己,不如先用鎖把程式碼塊鎖起來,i++這個過程只有自己能做,直到我自己做完了,才讓別人去接著幹這個事情。而對於CAS來說,執行緒會樂觀的認為,沒人會來改自己的東西,我先把值取出來,先改了再說。但它也不傻,還是會去檢查結果,發現已經被改過,那也沒辦法,只能再來一遍。

CAS指令問題

既然CAS指令在執行效率要高於synchronized,那麼是不是可以替代synchronized呢?

答案是不可以,因為CAS存在以下問題:

  1. ABA問題
  2. 開銷問題
  3. 只能保證一個共享變數的原子操作

我們來看以下思考:

假設有執行緒1和執行緒2,並且有一個變數A,對於1來說,它要把A改為B,假設執行緒2跑的更快,它先把A改為C,再改回A。那麼對於執行緒1來說,在執行CAS指令的時候,發現A的值沒有變化吧,並沒有修改過,然後放心的將它改寫為B。

但實際上,這個A已經不是原來的A了,已經被執行緒2修改過。但是CAS操作並沒有辦法去發現。拿一個實際的例子來說,你的水杯裡面裝滿了你最喜歡的可樂,這時候你有事去打了一個電話,你的女朋友口渴了,突然喝了一口,然後她怕被你發現,又將水杯給重新倒滿了。你打完電話回來,一看水杯是滿的,並沒有人喝過,然後繼續高興的吃你的炸雞。這個就是ABA問題。

那麼要怎麼解決ABA問題呢,我們可以加上一個版本戳,每次修改,都會對更新當前變數的版本。

而開銷問題就是說,因為CAS指令是基於自旋來實現的,但是執行緒如果長時間不能成功執行,會給CPU帶來非常大的執行開銷。

CAS指令是通過比較記憶體中某個變數的值,來決定是否能進行交換操作,對於計算機來說,一個地址只能存放一個變數。所以CAS指令一次只能保證一個共享變數的原子操作。假如有ABC三個變數,需要保證它們的讀寫是一個原子操作,那麼CAS就不能實現了,而使用synchronized就能很好的執行。從Java 1.5開始,JDK提供了AtomicReference類來保證引用物件之間的原子性,就可以把多個變數放在一個物件裡來進行CAS操作。

Java中的原子操作類

Jdk中為我們提供瞭如下相關原子操作類:

  • 更新基本型別類:AtomicBoolean,AtomicInteger,AtomicLong
  • 更新陣列類:AtomicIngerArray,AtomicLongArray,AtomicReferenceArray
  • 更新引用型別:AtomicReference,AtomicMarkableReference,AtomicStampedReference