1. 程式人生 > 實用技巧 >Java(29)java多執行緒

Java(29)java多執行緒

執行緒的基本概念

首先理解些基本概念:程式--》程序--》執行緒

  • 程式(program):是指為完成特定任務、用某種程式語言編寫的一組指令的集合。即指一段靜態的程式碼,靜態物件。

  • 程序(process):是程式的一次執行過程,或者正在執行的一個程式。動態過程:有它自身的產生、存在和消亡的過程。

    • 比如執行中的QQ,執行中的MP3播放器,都是程序(動態的)。
    • 程式--》靜態,程序--》動態
  • 執行緒(thread):程序可進一步細化成執行緒,是一個程式內部的一條執行路徑

    • 若一個程式可同一時間執行多個執行緒,就是支援多執行緒的
    • 執行緒相當於程序的一條分支。

再來理解一些概念:

  • 併發:一個處理器同時處理多個任務,邏輯上的同時發生。
  • 並行:多個處理器或者多核處理器同時處理不同的任務,物理上的同時發生。
    • 併發是一個人同時吃三個饅頭,並行是三個人同時吃三個饅頭。併發的一個處理器掛了,所有任務都掛了,並行的某個處理器掛了,也只有它自己對應的任務受影響,另外兩個處理器及其執行的任務不受影響。

在討論什麼是執行緒前有必要先說下什麼是程序,因為執行緒是程序中的一個實體,執行緒本身是不會獨立存在的。 程序是程式碼在資料集合上的一次執行活動, 是系統進行資源分配 和排程的基本單位, 執行緒則是程序的一個執行路徑, 一個程序中至少有一個執行緒,程序中的多個執行緒共享程序的資源。

作業系統在分配資源時是把資源分配給程序的, 但是 CPU 資源比較特殊, 它是被分配到執行緒的, 因為真正要佔用 CPU 執行的是執行緒, 所以也說執行緒是 CPU 分配的基本單位

Java 中,當我們啟動 main 函式時其實就啟動了一個JVM 的程序, 而 main 函式所 在的執行緒就是這個程序中的一個執行緒,也稱主執行緒。

由圖 1-1 可以看到, 一個程序中有多個執行緒,多個執行緒共享程序的堆和方法區資源, 但是每個執行緒有自己的程式計數器和棧區域。

程式計數器是一塊記憶體區域,用來記錄執行緒當前要執行的指令地址。 那麼為何要將程式計數器設計為執行緒私有的呢?前面說了執行緒是佔用 CPU 執行的基本單位,而 CPU 一 般是使用時間片輪轉方式讓執行緒輪詢佔用的

,所以當前執行緒 CPU 時間片用完後,要讓出 CPU,等下次輪到自 己的時候再執行。 那麼如何知道之前程式執行到哪裡了呢?其實程式計數器就是為了記錄該執行緒讓出 CPU 時的執行地址的,待再次分配到時間片時執行緒就可以從自己私有的計數器指定地址繼續執行。 另外需要注意的是,如果執行的是 native 方法, 那麼 pc 計數器記錄的是 undefined 地址,只有執行的是 Java 程式碼時 pc 計數器記錄的才是 下一條指令的地址。

另外每個執行緒都有自己的棧資源,用於儲存該執行緒的區域性變數,這些區域性變數是該執行緒私有的,其他執行緒是訪問不了的,除此之外棧還用來存放執行緒的呼叫棧幀。

堆是一個程序中最大的一塊記憶體,堆是被程序中的所有執行緒共享的,是程序建立時分配的,堆裡面主要存放使用 new 操作建立的物件例項。

方法區則用來存放JVM載入的類、常量及靜態變數等資訊,也是執行緒共享的。

多執行緒應用場景

何時需要多執行緒?

  1. 程式需要同時執行多個任務
  2. 程式需要實現一些等待的任務時,如使用者輸入、檔案讀寫操作、網路操作、搜尋等。
  3. 需要一些後臺執行的程式時。
    • 因為多執行緒是程序的分支。當分支之後,就各走各的。假設在程序上跑的程式碼是主程式,當其中的第三行程式碼是開啟執行緒的,那麼,開啟執行緒之後執行緒執行的的程式碼就是和主程式並行(它們之間不相干了)

多執行緒的建立與啟動

java語言的JVM允許程式執行多個執行緒,它通過java.lang.Thread類來實現。

Thread類的特性:

  1. 每個執行緒都是通過某個特定Thread物件的run()方法來完成操作的,經常把run()方法的主體稱為執行緒體。要執行的程式碼邏輯寫在run()方法裡。
  2. 通過該Thread物件的start()方法來呼叫這個執行緒。本質是呼叫run()方法

Thread類主要方法

Thread類的構造方法:

public Thread()//建立新的Thread物件
public Thread(String threadname) //建立執行緒並指定執行緒例項名
public Thread(Runnable target) // 指定建立執行緒的目標物件,它實現了Runnable介面的run()方法
public Thread(Runnable target,String threadname) //建立新的Thread物件

Thread類的方法:

void start()  //啟動執行緒
run() //執行緒被排程時執行的操作
String getName()//返回執行緒的名稱
void setName()//設定執行緒名稱
static currentThread()//返回當前執行緒

方式1:建立多執行緒——繼承Thread類

1、建立一個類,繼承Thread類並重寫run()方法

2、建立Thread例項物件,並執行執行緒。

class MyThread extends Thread{
    @Override
    public void run() {
        System.out.println("多執行緒執行的程式碼寫在這個run()方法裡");
        for (int i = 0; i <5 ; i++) {
            System.out.println("湖人總冠軍");
        }
    }
}
public class Test {
    public static void main(String[] args) {
        Thread td=new MyThread();
        td.start();
        for (int i = 0; i <300 ; i++) {
            System.out.println("==============================");
            System.out.println("==============================");
            System.out.println("==============================");
        }

    }
}

/*執行結果大致如下:
==============================
多執行緒執行的程式碼寫在這個run()方法裡
湖人總冠軍
湖人總冠軍
湖人總冠軍
湖人總冠軍
湖人總冠軍
==============================
==============================
*/

從上面的案例結果可以看到,main()方法中列印的內容與開啟執行緒的run()方法中的列印語句是混合起來的,而每一次執行,列印順序都是不固定的。

這是因為main()方法執行td.start()方法開啟多執行緒之後,就相當於在main()方法之外開啟了一條支流,這個時候,td.start()之後的main()方法中的其它程式碼的執行就跟run()方法執行無關了

這個就是多執行緒的非同步性

public class Test {
    public static void main(String[] args) {
        Thread td=new MyThread();
        td.start();
        Thread t2=new MyThread();
        t2.start();
        Thread t3=new MyThread();  
        t3.start();
        System.out.println("=============");
    }
}

從上面程式碼可以看到,執行緒的個數是可以自己適當增加的

方式2:建立多執行緒——實現Runnable介面

建立一個類,實現Runnable介面,並重寫run()方法。

class MyThread2 implements Runnable {
    @Overridejava
    public void run() {
        System.out.println(Thread.currentThread().getName()+"Runnable多執行緒執行的程式碼");
        for (int i = 0; i <5; i++) {
            System.out.println(Thread.currentThread().getName()+"這是多執行緒的邏輯程式碼");
        }
    }
}
public class Test {
    public static void main(String[] args) {
//        Thread(Runnable target)
        Thread th1=new Thread(new MyThread2()); //new Mythread2()就是target
        th1.start();
//        Thread(Runnable target, String name)
        Thread th2=new Thread(new MyThread2(),"th2");  //定義的執行緒名稱的作用看輸出
        th2.start();
        
        System.out.println("=========================");
        
    }
}
/*執行結果為:
=========================
Thread-0Runnable多執行緒執行的程式碼
th2Runnable多執行緒執行的程式碼
th2這是多執行緒的邏輯程式碼
th2這是多執行緒的邏輯程式碼
th2這是多執行緒的邏輯程式碼
th2這是多執行緒的邏輯程式碼
th2這是多執行緒的邏輯程式碼
Thread-0這是多執行緒的邏輯程式碼
Thread-0這是多執行緒的邏輯程式碼
Thread-0這是多執行緒的邏輯程式碼
Thread-0這是多執行緒的邏輯程式碼
Thread-0這是多執行緒的邏輯程式碼
*/
//Thread-0是系統給出的執行緒預設名稱

兩種建立執行緒方式的區別

從案例1和案例2可以知道建立多執行緒的兩個方式:

  • 繼承Thread
  • 實現Runnable介面

一般,推薦使用實現Runnable介面的方式

  • 這樣可以避免單繼承的問題。單繼承就是說一個類只能繼承一個類,不可繼承多個類。使用介面方式我們的執行緒類就還可以繼承其它的類。

  • 介面方式使得多個執行緒可以共享同一個介面實現類的物件,非常適合多個相同的執行緒來處理同一份資源。詳情看下列程式碼:

    public class Test {
        public static void main(String[] args) {
            Runnable ra=new MyThread3();
            Thread t1=new Thread(ra);
            t1.start();	
            Thread t2=new Thread(ra);
            t2.start();
        }
    }
    
    class MyThread3 implements Runnable {
        int count=1;
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"Runnable多執行緒執行的程式碼");
            for (int i = 0; i <5; i++) {
                count++;
                System.out.println(Thread.currentThread().getName()+"這是多執行緒的邏輯程式碼"+count);
            }
        }
    }
    /*執行結果為:
    Thread-0Runnable多執行緒執行的程式碼
    Thread-0這是多執行緒的邏輯程式碼2
    Thread-0這是多執行緒的邏輯程式碼3
    Thread-0這是多執行緒的邏輯程式碼4
    Thread-0這是多執行緒的邏輯程式碼5
    Thread-0這是多執行緒的邏輯程式碼6
    Thread-1Runnable多執行緒執行的程式碼
    Thread-1這是多執行緒的邏輯程式碼7
    Thread-1這是多執行緒的邏輯程式碼8
    Thread-1這是多執行緒的邏輯程式碼9
    Thread-1這是多執行緒的邏輯程式碼10
    Thread-1這是多執行緒的邏輯程式碼11
    */
    //從輸出結果可知,Thread-0執行緒和Thread-1執行緒是共享同一個count的
    

主執行緒可以獲取子執行緒返回值的方法——FutureTask

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

class Test{
    public static void main(String[] args) {
        FutureTask<String> futureTask=new FutureTask<>(new CallerTask());
        new Thread(futureTask).start();
        try {
            String result=futureTask.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
class CallerTask implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println("FutureTask");
        return "hello";
    }
}

執行結果:

上面程式碼中,CallerTask類實現了Callable介面的call()方法。

FutureTask物件作為任務建立一個執行緒並且啟動它,可以通過get()方法等待任務執行完畢並獲取返回結果

多執行緒的優點

  1. 提高應用程式的響應。對圖形介面更有意義,可增強使用者體驗。
  2. 提高CPU利用率
  3. 改善程式結構,長而複雜的程序分為多個執行緒,獨立執行,利於理解和修改。
    • 比如一個方法裡有1000行程式碼,前300行,中間300行,最後400行,如果這三段程式碼沒有因果關係,這種情況我們就可以使用執行緒處理,把這三段程式碼分別放在不同的執行緒中去執行,這三段程式碼是並行執行的。

執行緒的優先順序

執行緒的優先順序就是哪個執行緒有較大的概率先執行。只是說概率比較大,並不是絕對的。

優先順序是用陣列1-10表示,資料越大優先順序越高,如果沒有設定,預設優先順序是5。

getPriority(); //獲取執行緒優先值:
setPriority(int ..) //設定執行緒的優先順序

優先順序案例

public class Test {
    public static void main(String[] args) {
        Thread t1=new Thread(new MyThread4(),"t1");
        t1.setPriority(3);
        t1.start();

        Thread t2=new Thread(new MyThread4(),"t2");
        t2.setPriority(4);
        t2.start();
    }
}

class MyThread4 implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i <5; i++) {
            System.out.println(Thread.currentThread().getName()+"這是多執行緒的邏輯程式碼");
        }
    }
}
/*執行結果為:
t1這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
*/

從上面結果可以知道,t2的有更多的內容執行在前面。每次執行,結果都可能不一樣

執行緒讓步 yield()

暫停正在執行的執行緒,把執行機會讓給優先級別相同或更高的執行緒。注意,不是說執行緒A禮讓了,就一定會執行執行緒B,執行緒A禮讓了只是說A回到了就緒狀態,回到後,A還可能搶到CPU時間片。

若佇列中沒有同優先順序的執行緒,忽略此方法

public class Test {
    public static void main(String[] args) {
        Thread t1=new Thread(new MyThread4(),"t1");
        t1.start();
        Thread t2=new Thread(new MyThread4(),"t2");
        t2.start();
    }
}

class MyThread4 implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i <5; i++) {
            if (i % 2 == 0) {
                Thread.yield();//執行緒讓步
            }
            System.out.println(Thread.currentThread().getName()+"這是多執行緒的邏輯程式碼");
        }
    }
}
/*執行結果為:
t1這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
*/

執行緒阻塞 join()

當某個程式執行流中呼叫其它執行緒的join()方法時,呼叫的執行緒將被阻塞,直到join()方法加入的join執行緒被執行完。

優先順序低的執行緒也可以獲得執行

public class Test {
    public static void main(String[] args) {
        Thread t1=new Thread(new MyThread4(),"t1");
        t1.start();
        Thread t2=new Thread(new MyThread4(),"t2");
        t2.start();

        System.out.println("1====================1");
        try {
            t1.join(); //相當於把t1的run()方法插入到這個位置執行
            /**
             * 阻塞當前的main方法,先不執行System.out.println("2======================2");程式碼
             * 先執行join進來的執行緒的程式碼
             */
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println("2======================2");
        System.out.println("3======================3");
    }
}

class MyThread4 implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i <5; i++) {
            System.out.println(Thread.currentThread().getName()+"這是多執行緒的邏輯程式碼");
        }
    }
}


/*執行將結果為:
1====================1
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
2======================2
3======================3
t2這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
t2這是多執行緒的邏輯程式碼
*/

執行緒睡眠 sleep()

package com.jimmy.day05;

public class Test {
    public static void main(String[] args) {
        Thread t1=new Thread(new MyThread4(),"t1");
        t1.start();
    }
}

class MyThread4 implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i <5; i++) {
            try {
                Thread.sleep(1000); // 當前執行緒睡眠1000毫秒
                //也就是當前迴圈每隔1秒才迴圈一次
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"這是多執行緒的邏輯程式碼");
        }
    }
}
/*執行結果為:
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
t1這是多執行緒的邏輯程式碼
*/
//輸出結果每隔一秒才輸出一行

強制結束執行緒生命週期 stop()

執行緒還沒有執行完就把它結束掉。該方法過時了

package com.jimmy.day05;

public class Test {
    public static void main(String[] args) {
        Thread t1=new Thread(new MyThread4(),"t1");
        t1.start();
        System.out.println("===================");
        System.out.println("===================");
        t1.stop();
    }
}

class MyThread4 implements Runnable {
    int count=0;
    @Override
    public void run() {
        for (int i = 0; i <5; i++) {
            count++;
            System.out.println(Thread.currentThread().getName()+"這是多執行緒的邏輯程式碼"+count);
        }
    }
}
/*執行結果為:
===================
===================
*/

判斷執行緒是否存活 isAlive()

package com.jimmy.day05;

public class Test {
    public static void main(String[] args) {
        Thread t2=new Thread(new MyThread4(),"t2");
        t2.start();
        try {
            t2.join();
        } catch (InterruptedException e) {
            ejava.printStackTrace();
        }
        System.out.println(t2.isAlive());
    }
}

class MyThread4 implements Runnable {
    int count=0;
    @Override
    public void run() {
        for (int i = 0; i <3; i++) {
            count++;
            System.out.println(Thread.currentThread().getName()+"這是多執行緒的邏輯程式碼"+count);
        }
    }
}
/*執行結果為:
t2這是多執行緒的邏輯程式碼1
t2這是多執行緒的邏輯程式碼2
t2這是多執行緒的邏輯程式碼3
false
*/

執行緒的生命週期

生命週期:執行緒從生到死的經歷。

執行緒生命週期的5種狀態:

  1. 新建:Thread類被宣告建立時。
  2. 就緒:使用start()方法後。
  3. 執行:就緒的執行緒被排程並獲得處理器資源,進入執行狀態。
  4. 阻塞:被人為掛起或執行輸入輸出操作時,讓出CPU並臨時終止自己的執行,進入阻塞狀態。類似堵車了,車不能動。
  5. 死亡:全部工作完成或者人為強制關閉執行緒。

其實阻塞狀態有一些比較特殊的,我們可以將生命週期劃分得更詳細一些,如下圖,下面的等待佇列和鎖池都可以看成是阻塞狀態。

同步鎖synchronized

多個執行緒執行導致的問題

  • 多個執行緒的不確定性引起執行結果的不穩定

  • 多個執行緒對賬本的共享,會造成操作的不完整性,會破壞資料。

問題案例1:不同使用者從同一個賬戶取錢

package com.jimmy.day05;

public class Test{
    public static void main(String[] args) {
        //建立一個賬戶,並初始化餘額為3000
        Accout a1=new Accout(3000);
        //建立兩個要從上面賬戶取錢的使用者
        User u1=new User(a1,2000);
        User u2=new User(a1,2000);
        //執行多執行緒,同時取錢
        Thread jimmy=new Thread(u1,"Jimmy");
        Thread krystal=new Thread(u2,"krystal");
        jimmy.start();
        krystal.start();
    }
}
//定義類,模擬使用者
class Accout{
    //初始化使用者餘額
    private int money;
    public Accout(int num){
        this.money=num;
        System.out.println("建立了一個餘額為3000的使用者");
    }
    //定義取錢方法;
    public  void drawFun(int m){
        if(money-m<0){
            System.out.println(Thread.currentThread().getName()+" : "+"餘額不足了,取錢失敗");
            System.out.println(Thread.currentThread().getName()+" : "+"當前賬戶餘額為:"+money);
        }else{
            money=money-m;
            System.out.println(Thread.currentThread().getName()+" : "+"取了"+m+"元");
            System.out.println(Thread.currentThread().getName()+" : "+"餘額變成了: "+money);
        }
    }
}
//定義執行緒類,模擬使用者取款
class User implements Runnable{
    Accout accout;
    int m;
    public User(Accout accout,int m){
        this.accout=accout;
        this.m=m;
    }
    @Override
    public void run() {
        accout.drawFun(m);
    }
}
/*執行結果為:
建立了一個餘額為3000的使用者
Jimmy : 取了2000元
krystal : 餘額不足了,取錢失敗
krystal : 當前賬戶餘額為:1000
Jimmy : 餘額變成了: 1000
*/
可以看到,jimmy取錢操作還沒完整執行,krystal就過來插入執行了。
一個執行緒還沒執行完,另一個執行緒就運行了。這個就是多執行緒的問題。

問題案例2:

class Test {
    public static void main(String[] args) {
        Runnable r=()->{
            while(TicketCenter.restCount>0){
                System.out.println(Thread.currentThread().getName()+"賣出一張票,剩餘"+ --TicketCenter.restCount + "張");
            }
        };
        Thread t1=new Thread(r,"thread -1");
        Thread t2=new Thread(r,"thread -2");
        Thread t3=new Thread(r,"thread -3");
        Thread t4=new Thread(r,"thread -4");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}
class TicketCenter{
    public static int restCount=100;
}
/*執行輸出結果:
thread -4賣出一張票,剩餘99張
thread -1賣出一張票,剩餘97張
thread -2賣出一張票,剩餘96張
thread -3賣出一張票,剩餘98張
thread -3賣出一張票,剩餘92張
thread -3賣出一張票,剩餘91張
thread -2賣出一張票,剩餘93張
thread -2賣出一張票,剩餘89張
thread -2賣出一張票,剩餘88張
thread -2賣出一張票,剩餘87張
thread -2賣出一張票,剩餘86張
thread -2賣出一張票,剩餘85張
thread -1賣出一張票,剩餘94張
*/  產生這種結果的原因可能是,一個執行緒計算完剩餘票數後,還沒列印,另一個執行緒就搶到了CPU時間片

上面問題的解決辦法是加同步鎖。

在普通方法上加同步鎖synchronized鎖的是整個物件,不是某一個方法不同的物件就是不同的鎖

案例1:執行緒針對同一個物件時,是同把鎖。

package com.jimmy.day05;

public class Test{
    public static void main(String[] args) {
        T1 emm=new T1();   //下面所有的執行緒針對的都是同一個物件
        Thread t1=new Thread(emm,"t1");
        Thread t2=new Thread(emm,"t2");
        Thread t3=new Thread(emm,"t3");
        t1.start();
        t2.start();
        t3.start();
    }
}
class T1 implements Runnable{
    @Override
    public synchronized void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName()+"鎖的應用");
        }
    }
}
//從下列執行結果看到,鎖住了,執行緒不會交叉同時執行。
/*執行結果為:
t1鎖的應用
t1鎖的應用
t1鎖的應用
t1鎖的應用
t1鎖的應用
t2鎖的應用
t2鎖的應用
t2鎖的應用
t2鎖的應用
t2鎖的應用
t3鎖的應用
t3鎖的應用
t3鎖的應用
t3鎖的應用
t3鎖的應用
*/

案例2:執行緒針對不同物件時,不是同把鎖

package com.jimmy.day05;

public class Test{
    public static void main(String[] args) {
        Thread t1=new Thread(new T1(),"t1");
        Thread t2=new Thread(new T1(),"t2");
        Thread t3=new Thread(new T1(),"t3");
        t1.start();
        t2.start();
        t3.start();
    }
}
class T1 implements Runnable{
    @Override
    public synchronized void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName()+"鎖的應用");
        }
    }
}
//從下列結果知,雖然方法加上了同步鎖,但是執行緒間還是交叉同步運行了,是因為執行緒針對的不同的物件。
/*執行結果為:
t1鎖的應用
t1鎖的應用
t1鎖的應用
t2鎖的應用
t2鎖的應用
t2鎖的應用
t1鎖的應用
t1鎖的應用
t2鎖的應用
t2鎖的應用
t3鎖的應用
t3鎖的應用
t3鎖的應用
t3鎖的應用
t3鎖的應用
*/

案例3:使用static synchronized,對所有物件是同把鎖,可看成是類鎖。

package com.jimmy.day05;

public class Test{
    public static void main(String[] args) {
        //建立一個賬戶,並初始化餘額為3000
        Accout a1=new Accout(3000);
        //建立兩個要從上面賬戶取錢的使用者
        User u1=new User(a1,2000);
        User u2=new User(a1,2000);
        //執行多執行緒,同時取錢
        Thread jimmy=new Thread(u1,"Jimmy");
        Thread krystal=new Thread(u2,"krystal");
        jimmy.start();
        krystal.start();
    }
}
//定義類,模擬使用者
class Accout{
    //初始化使用者餘額
    private static int money;
    public Accout(int num){
        this.money=num;
        System.out.println("建立了一個餘額為3000的使用者");
    }
    //定義取錢方法;
    public  static synchronized void drawFun(int m){
        if(money-m<0){
            System.out.println(Thread.currentThread().getName()+" : "+"餘額不足了,取錢失敗");
            System.out.println(Thread.currentThread().getName()+" : "+"當前賬戶餘額為:"+money);
        }else{
            money=money-m;
            System.out.println(Thread.currentThread().getName()+" : "+"取了"+m+"元");
            System.out.println(Thread.currentThread().getName()+" : "+"餘額變成了: "+money);
        }
    }
}
//定義執行緒類,模擬使用者取款
class User implements Runnable{
    Accout accout;
    int m;
    public User(Accout accout,int m){
        this.accout=accout;
        this.m=m;
    }
    @Override
    public void run() {
        accout.drawFun(m);
    }
}
//兩個執行緒不再交叉同時執行。
/*執行結果為:
建立了一個餘額為3000的使用者
krystal : 取了2000元
krystal : 餘額變成了: 1000
Jimmy : 餘額不足了,取錢失敗
Jimmy : 當前賬戶餘額為:1000
*/ 

案例4:同步程式碼段(上面的加鎖方式是同步方法)

上面都是給方法加上synchronized關鍵字,我們還可以使用同步程式碼段的方式來解決:

class Test {
    public static void main(String[] args) {
        Runnable r=()->{
            while(TicketCenter.restCount>0){
                synchronized ("鎖"){
                    System.out.println(Thread.currentThread().getName()+"賣出一張票,剩餘"+ --TicketCenter.restCount + "張");
                }
            }
        };
        Thread t1=new Thread(r,"thread -1");
        Thread t2=new Thread(r,"thread -2");
        Thread t3=new Thread(r,"thread -3");
        Thread t4=new Thread(r,"thread -4");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}
class TicketCenter{
    public static int restCount=100;
}
/*
thread -2賣出一張票,剩餘99張
thread -2賣出一張票,剩餘98張
thread -2賣出一張票,剩餘97張
thread -2賣出一張票,剩餘96張
...
thread -2賣出一張票,剩餘1張
thread -2賣出一張票,剩餘0張
thread -4賣出一張票,剩餘-1張
thread -1賣出一張票,剩餘-2張
thread -3賣出一張票,剩餘-3張

*/ 列印是有序的,但是會有負數,這是因為,在同步程式碼段外,有3個執行緒進入了迴圈體,且在等待,一旦這3個執行緒搶到鎖,就不會再判斷TicketCenter.restCount>0,所以,我們還要加一個條件,改進的程式碼如下:
    
class Test {
    public static void main(String[] args) {
        Runnable r=()->{
            while(TicketCenter.restCount>0){
                synchronized ("鎖"){
                    if(TicketCenter.restCount<=0){
                        return;
                    }
                    System.out.println(Thread.currentThread().getName()+"賣出一張票,剩餘"+ --TicketCenter.restCount + "張");
                }
            }
        };
        Thread t1=new Thread(r,"thread -1");
        Thread t2=new Thread(r,"thread -2");
        Thread t3=new Thread(r,"thread -3");
        Thread t4=new Thread(r,"thread -4");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}
class TicketCenter{
    public static int restCount=100;
}

顯式鎖ReenTrantLock

關鍵程式碼:

  1. 建立顯示鎖物件:ReentrantLock mylock=new ReentrantLock();
  2. 上鎖:mylock.lock();
  3. 解鎖:mylock.unlock();
import java.util.concurrent.locks.ReentrantLock;

class Test {
    public static void main(String[] args) {
        ReentrantLock mylock=new ReentrantLock();
        Runnable r=()->{
            while(TicketCenter.restCount>0){
                synchronized ("鎖"){
                    mylock.lock();  //上鎖
                    if(TicketCenter.restCount<=0){
                        mylock.unlock();  
                        //如果這裡退出迴圈時不解鎖,會導致某一個執行緒持續佔用CPU資源,程式不會停掉
                        return;
                    }
                    System.out.println(Thread.currentThread().getName()+"賣出一張票,剩餘"+ --TicketCenter.restCount + "張");
                    mylock.unlock();  //解鎖
                }
            }
        };
        Thread t1=new Thread(r,"thread -1");
        Thread t2=new Thread(r,"thread -2");
        Thread t3=new Thread(r,"thread -3");
        Thread t4=new Thread(r,"thread -4");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}
class TicketCenter{
    public static int restCount=100;
}
/*
thread -1賣出一張票,剩餘99張
thread -1賣出一張票,剩餘98張
thread -1賣出一張票,剩餘97張
thread -1賣出一張票,剩餘96張
...
thread -2賣出一張票,剩餘2張
thread -2賣出一張票,剩餘1張
thread -2賣出一張票,剩餘0張
*/

死鎖

在程式當中,應該儘量避免死鎖的出現。

什麼是死鎖,看下面例子:

import java.util.concurrent.locks.ReentrantLock;

class Test {
    public static void main(String[] args) {
        ReentrantLock mylock=new ReentrantLock();
        Runnable r1=()->{
                synchronized ("A"){
                    System.out.println("執行緒1持有了鎖A");
                    synchronized ("B"){
                        System.out.println("執行緒1同時持有了鎖A和鎖B");
                    }
                }
        };

        Runnable r2=()->{
                synchronized ("B"){
                    System.out.println("執行緒2持有了鎖B");
                    synchronized ("A"){
                        System.out.println("執行緒2同時持有了鎖A和鎖B");
                    }
                }
        };
        Thread t1=new Thread(r1);
        Thread t2=new Thread(r2);
        t1.start();
        t2.start();
    }
}

程式執行介面:

可以看到,程式沒有結束,一直執行著,而且,兩個執行緒都沒有打印出同時持有鎖A和鎖B,這個就是死鎖。多個執行緒持有對方的鎖物件,但是又不釋放自己的鎖。

當然,如果其中一個執行緒執行足夠快,也可能同時拿到2把鎖,然後程式最終會結束執行。

根據上面例子的理解,可以直到,死鎖產生的必須具備以下4個條件:

  1. 互斥條件:資源已經被獲取到並使用,其它請求者只能等待
  2. 請求並持有條件:一個執行緒已經持有了至少一個資源,但又提出了新的資源請求,但是新資源已經被其它執行緒佔有。
  3. 不可剝奪條件:執行緒獲取到的資源沒使用完時不能被其它執行緒搶佔。
  4. 環路等待條件:存線上程的環形鏈,即執行緒集合{T0,T1,T2,...,TN}中的T0等待T1,T1等待T2,....,TN等待T0。

wait/notify/notifyAll

wait:等待,是Object類中的一個方法,作用是當前的執行緒釋放自己的鎖標記,並且讓出CPU資源,使當前的執行緒進入等待佇列中。

notify:通知,是Object類中的一個方法,作用是喚醒等待佇列中的一個執行緒,是這個執行緒進入鎖池。

notifyAll:通知,是Object類中的一個方法,作用是喚醒等待佇列中的所有執行緒,是這些執行緒進入鎖池。

解決死鎖版本1:

import java.util.concurrent.locks.ReentrantLock;

class Test {
    public static void main(String[] args) {
        ReentrantLock mylock=new ReentrantLock();
        Runnable r1=()->{
                synchronized ("A"){
                    System.out.println("執行緒1持有了鎖A");
                    try {
                        "A".wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    synchronized ("B"){
                        System.out.println("執行緒1同時持有了鎖A和鎖B");
                    }
                }
        };

        Runnable r2=()->{
                synchronized ("B"){
                    System.out.println("執行緒2持有了鎖B");
                    synchronized ("A"){
                        System.out.println("執行緒2同時持有了鎖A和鎖B");
                    }
                }
        };
        Thread t1=new Thread(r1);
        Thread t2=new Thread(r2);
        t1.start();
        t2.start();
    }
}

執行介面:

可以看到,加上了"A".wait()後,執行緒2同時持有了鎖A和鎖B,但是程式依然還不能結束執行,因為執行緒1進入了等待佇列。此時,我們要加上notifyAll或者notify來解決。

版本2:

import java.util.concurrent.locks.ReentrantLock;

class Test {
    public static void main(String[] args) {
        ReentrantLock mylock=new ReentrantLock();
        Runnable r1=()->{
                synchronized ("A"){
                    System.out.println("執行緒1持有了鎖A");
                    try {
                        "A".wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    synchronized ("B"){
                        System.out.println("執行緒1同時持有了鎖A和鎖B");
                    }
                }
        };

        Runnable r2=()->{
                synchronized ("B"){
                    System.out.println("執行緒2持有了鎖B");
                    synchronized ("A"){
                        System.out.println("執行緒2同時持有了鎖A和鎖B");
                        "A".notify();
                    }
                }
        };
        Thread t1=new Thread(r1);
        Thread t2=new Thread(r2);
        t1.start();
        t2.start();
    }
}

執行介面:

可以看到,問題已經解決了。

其實,我們還可以在設計程式之初,不依靠waitnotify這些方法來避免死鎖的。這個方法就是申請資源的有序性。

比如說,執行緒A和執行緒B,只需要保證兩個執行緒獲取資源的順序一致,就可以避免死鎖,如執行緒A和執行緒B都需要資源1,2,3,4,....n,但是都只有在獲取了資源n-1的時候才能去獲取資源n。

多執行緒模式下的懶漢式單例

單執行緒模式下的懶漢式單例:

import java.util.concurrent.locks.ReentrantLock;

class Test {
    public static void main(String[] args) {
       for(int i=0;i<100;i++){
           Boss.getBoss();
       }
    }
}


class Boss{
    private Boss(){
        System.out.println("一個Boss物件被例項化了");
    }
    private static Boss instance=null;
    public static Boss getBoss(){
        if(instance==null){
             instance=new Boss();
        }
        return instance;
    }
}

執行結果:

可以看到,只例項化了1個Boss物件。

但是,在多執行緒模式下,如果不加鎖,就會出問題,看下列程式碼:

import java.util.concurrent.locks.ReentrantLock;

class Test {
    public static void main(String[] args) {
       Runnable r=()->{
           Boss.getBoss();
       };
       for (int i=0;i<100;i++){
           new Thread(r).start();
       }
    }
}


class Boss{
    private Boss(){
        System.out.println("一個Boss物件被例項化了");
    }
    private static Boss instance=null;
    public static Boss getBoss(){
        if(instance==null){
             instance=new Boss();
        }
        return instance;
    }
}

執行結果:

可以看到,多個Boss物件被例項化了,這個就是多執行緒下的單例問題。

下面,我們使用加鎖來解決這個問題:

import java.util.concurrent.locks.ReentrantLock;

class Test {
    public static void main(String[] args) {
       Runnable r=()->{
           Boss.getBoss();
       };
       for (int i=0;i<100;i++){
           new Thread(r).start();
       }
    }
}


class Boss{
    private Boss(){
        System.out.println("一個Boss物件被例項化了");
    }
    private static Boss instance=null;
    public static Boss getBoss(){
        synchronized ("A"){
            if(instance==null){
                instance=new Boss();
            }
            return instance;
        }
    }
}

執行結果:

執行緒的上下文切換

多執行緒程式設計中 , 執行緒個數一般都大於cpu個數。而每個cpu同一時刻只能被一個執行緒使用。為了讓使用者感覺多個執行緒是在同時進行的。CPU資源的分配採用了時間片輪轉的策略。也就是給每個執行緒分配一個時間片。執行緒在時間片內佔用cpu執行任務,當前執行緒使用完時間片後就轉為就緒狀態,並讓出cpu給其他執行緒佔用,這就是上下文切換。

從當前執行緒的上下文切換到了其他執行緒。那麼就有一個問題,讓出cpu的執行緒,等下次輪到自己佔有cpu時,如何知道自己之前執行到哪裡了?所以在切換執行緒上下文時。需要儲存當前執行緒的執行現場。當再次執行時,根據儲存的執行現場資訊,恢復執行現場。

執行緒上下文切換的時機有: 當前執行緒的cpu時間片使用完處於就緒狀態時。當前執行緒被其他執行緒中斷時。

守護執行緒與使用者執行緒

Java 中的執行緒分為兩類,分別為 daemon 執行緒(守護執行緒〉user 執行緒(使用者執行緒)

守護執行緒是什麼?面試:守護執行緒是執行在後臺的一種特殊執行緒。它獨立於控制終端並且週期性地執行某種任務或等待處理某些發生的事件。在 Java 中垃圾回收執行緒就是特殊的守護執行緒。

在 JVM 啟動時會呼叫 main 函式, main 函式所在的錢程就是一個使用者執行緒,其實在 JVM 內部同時-還啟動了好多守護執行緒, 比如垃圾回收執行緒。那麼守護執行緒和使用者執行緒有什麼區 別呢?

區別之一是當最後一個非守護執行緒結束時, JVM 會正常退出,而不管當前是否有守護執行緒,也就是說守護執行緒是否結束並不影響 JVM的退出。言外之意,只要有一個用 戶執行緒還沒結束, 正常情況下JVM就不會退出。

建立守護執行緒的方式:使用setDaemon()方法

class Test{
    public static void main(String[] args) {
        Thread daemonThread=new Thread(new Runnable() {
            @Override
            public void run() {
                for(;;){}
            }
        });
        daemonThread.setDaemon(true);
        daemonThread.start();
    }
}

總結: 如果你希望在主執行緒結束後 JVM 程序馬上結束,那麼在建立執行緒時可以將其設定為守護執行緒,如果你希望在主執行緒結束後子執行緒繼續工作,等子執行緒結束後再讓JVM 程序結束,那麼就將子執行緒設定為使用者執行緒。

ThreadLocal

ThreadLocal介紹

多錢程訪問同一個共享變數時特別容易出現併發問題,特別是在多個執行緒需要對一個 共享變數進行寫入時。 為了保證執行緒安全,一般使用者在訪問共享變數時需要進行適當的 同步,如圖 1-3 所示。

同步的措施一般是加鎖。

但有一種特殊的應用場景:當建立一個變數後, 每個執行緒對其進行訪問的時候訪問的是自己執行緒的變數。

上面這種場景不會產生執行緒安全問題,實現這種場景的方法就是ThreadLocal

ThreadLocalJDK 包提供的,它提供了執行緒本地變數,也就是如果你建立了 一個 ThreadLocal 變數,那麼訪問這個變數的每個執行緒都會有這個變數的一個本地副本。 當多個執行緒操作這個變數時,實際操作的是自己本地記憶體裡面的變數,從而避免了執行緒安全問題。建立一個 ThreadLocal 變數後,每個執行緒都會複製一個變數到自己的本地記憶體,如圖 1-4 所示。

ThreadLocal程式碼演示

我們來看一下下列程式碼,使用ThreadLocal來實現每個執行緒訪問的都是自己執行緒的變數。

class ThreadLocalTest{
    static ThreadLocal<String> localVariable=new ThreadLocal<String>();
    static void myPrint(String str){
        System.out.println(str+":"+localVariable.get());
        localVariable.remove();
    }

    public static void main(String[] args) {
        Thread threadOne=new Thread(new Runnable() {
            @Override
            public void run() {
                localVariable.set("one");
                myPrint("threadOne");
                System.out.println("threadOne remove後的值"+":"+localVariable.get());
            }
        });
        Thread threadTwo=new Thread(new Runnable() {
            @Override
            public void run() {
                localVariable.set("two");
                myPrint("threadTwo");
                System.out.println("threadTwo remove後的值"+":"+localVariable.get());
            }
        });
        threadOne.start();
        threadTwo.start();
    }
}

執行結果如下,兩個執行緒修改和訪問的是自己執行緒的變數,不會相互影響。

程式碼中首先建立了一個ThreadLocal變數,然後建立了執行緒One,Two並啟動。

執行緒One通過set()方法,修改變數的值,這個其實修改的是本地記憶體中的一個副本,這個副本Two是訪問不了的。

remove()可以移除自己執行緒的本次記憶體中的值。

那ThreadLocal的原理是啥?為什麼能夠實現每個執行緒只訪問自己執行緒的變數?

ThreadLocal的原理

Thread 類中有一個 threadLocals 和一個 inheritableThreadLocals, 它們都是ThreadLocalMap 型別的變數, 而 ThreadLocalMap 是一個定製化的 Hashmap

在預設情 況下, 每個執行緒中的這兩個變數都為 null,只有當前執行緒第一次呼叫 ThreadLocalset 或 者 get 方法時才會建立它們。 其實每個執行緒的本地變數不是存放在 ThreadLocal 例項裡面, 而是存放在呼叫執行緒的 threadLocals 變數裡面。 也就是說, ThreadLocal 型別的本地變數存放在具體的執行緒記憶體空間中。

ThreadLocal 就是一個工具,它通過 set 方法把 value 值放 入呼叫執行緒的 threadLocals 裡面並存放起來, 當呼叫執行緒呼叫它的 get 方法時,再從當前 執行緒的 threadLocals 變數裡面將其拿出來使用 。 如果呼叫執行緒一直不終止, 那麼這個本地 變數會一直存放在呼叫執行緒的 threadLocals 變數裡面,所以當不需要使用本地變數時可以 通過呼叫 ThreadLocal 變數的 remove方法,從當前執行緒的 threadLocals裡面刪除該本地變數。

另外, Thread 裡面的 threadLocals 為何被設計為 map 結構?很明顯是因為每個執行緒可以關聯多個 ThreadLocal 變數。

我們通過原始碼來檢視一下setgetremove三個方法的實現邏輯:

    public void set(T value) {
        //獲取當前執行緒
        Thread t = Thread.currentThread();
        //將當前執行緒作為 key,去查詢對應的執行緒變數,找到則設定 
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            //第一次呼叫就建立當前執行緒對應的HashMap 
            createMap(t, value);
    }

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }

	public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
     }

總結 :如圖 1-6 所示, 在每個執行緒內部都有一個名 為 threadLocals 的成員變數, 該變數的型別為 HashMap, 其中 key 為我們定義的 ThreadLocal 變數的 this 引用 , value 則為我 們使用 set 方法設定的值

每個執行緒的本地變數存放線上程自己的記憶體變數 threadLocals 中, 如果當前執行緒一直不消亡, 那麼這些本地變數會一直存在, 所以可能會造成記憶體溢位, 因 此使用完畢後要記得呼叫 ThreadLocal 的 remove 方法刪除對應執行緒的 threadLocals 中的本地變數。

ThreadLocal不支援繼承性

ThreadLocal是不支援繼承性的,觀察下列程式碼:

class Test{
    static ThreadLocal<String> threadLocal=new ThreadLocal<>();
    public static void main(String[] args) {
        threadLocal.set("hello world");
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("子執行緒:"+threadLocal.get());
            }
        }).start();
        System.out.println("主執行緒:"+threadLocal.get());
    }


}

輸出結果如下:

這個結果其實符合我們的猜想,因為程式碼中呼叫set方法的是main執行緒,main執行緒和子執行緒是不同的執行緒,所以子執行緒列印了null,而主執行緒列印了自己設定的值。

那麼,如果我們想要讓子執行緒能訪問父執行緒的值,如何實現?這時候就到InheritableThreadLocal出場了。

InheritableThreadLocal 支援繼承性

InheritableThreadLocal 繼承自 ThreadLocal, 其提供了一個特性,就是讓子執行緒可以訪問在父執行緒中設定的本地變數。

詳情看書本。

什麼是多執行緒併發程式設計

首先要澄清併發和並行的概念,併發是指同一個時間段內多個任務同時都在執行,並且都沒有執行結束,而並行是說在單位時間 內多個任務同時在執行。 併發任務強調在一個 時間段內同時執行,而一個時間段由多個單位時間累積而成,所以說併發的多個任務在單位時間內不一定同時在執行。 在單 CPU 的時代多個任務都是併發執行的,這是因為單個 CPU 同時只能執行一個任務。 在單 CPU 時代多工是共享一個 CPU 的,當一個任務佔用 CPU 執行時,其他任務就會被掛起,當佔用 CPU 的任務時間片用完後,會把 CPU 讓給其 他任務來使用,所以在單 CPU 時代多執行緒程式設計是沒有太大意義的,並且執行緒間頻繁的上 下文切換還會帶來額外開銷。

下圖所示為在單個 CPU 上執行兩個執行緒,執行緒 A 和執行緒 B 是輪流使用 CPU 進行任 務處理的,也就是在某個時間內單個 CPU 只執行一個執行緒上面的任務。 當執行緒 A 的時間 片用完後會進行執行緒上下文切換,也就是儲存當前執行緒 A 的執行上下文,然後切換到線 程 B 來佔用 CPU 執行任務。

圖 2-2 所示為雙 CPU 配置,執行緒 A 和執行緒 B 各自在自己的 CPU 上執行任務,實現了 真正的並行執行。

而在多執行緒程式設計實踐中,執行緒的個數往往多於 CPU 的個數,所 以一般都稱多執行緒併發程式設計而不是多執行緒並行程式設計。

為什麼要進行多執行緒併發程式設計?

多核 CPU 時代的到來打破了單核 CPU 對多執行緒效能的限制。 多個 CPU 意味著每個 執行緒可以使用自己的 CPU 執行,這減少了執行緒上下文切換的開銷,但隨著對應用系統性 能和吞吐量要求的提高,出現了處理海量資料和請求的要求,這些都對高併發程式設計有著迫 切的需求。

執行緒安全問題

執行緒安全問題,是指多個執行緒同時讀寫一個共享資源並且沒有任何同步措施時,導致出現髒資料或者其它不可預見的結果的問題。

是不是說多個執行緒共享了資源, 當它們都去訪問這個共享資源時就會產生執行緒安全問題呢?答案是否定的, 如果多個執行緒都只是讀取共享資源, 而不去修改,那麼就不會存線上程安全問題, 只有當至少一個執行緒修改共享資源時才會存線上程安全問題。

最典型的就是計數器類的實現,計數變數 count 本身是一個共享變數, 多個執行緒可以對其進行遞增操作,如果不使用同步措施, 由於遞增操作是獲取一計算一儲存三步操作, 因此可能導致計數不準確,如下所示。

如何解決執行緒安全問題?可以使用關鍵字synchronized加鎖進行同步。

共享變數的記憶體可見性問題

共享變數的記憶體可見性問題可以理解為執行緒安全問題的一種。

談到記憶體可見性, 我們首先來看看在多執行緒下處理共享變數Java 的記憶體模型,如圖 2-4 所示。

Java 記憶體模型規定,將所有的變數都存放在主記憶體中當執行緒使用變數時,會把主記憶體裡面的變數複製到自己的工作空間或者叫作工作記憶體,執行緒讀寫變數時操作的是自己工作記憶體中的變數

Java 記憶體模型是一個抽象的概念,那麼在實際實現中執行緒的工作記憶體是什麼呢?請看圖 2-5。

圖中所示是一個雙核 CPU 系統架構,每個核有自己的控制器和運算器,其中控制器包含一組暫存器和操作控制器,運算器執行算術邏輔運算。每個核都有自己的一級快取, 在有些架構裡面還有一個所有 CPU 都共享的二級快取。 那麼 Java 記憶體模型裡面的工作內 存,就對應這裡的 Ll 或者 L2 快取或者 CPU 的暫存器

當一個執行緒操作共享變數時, 它首先從主記憶體複製共享變數到自己的工作記憶體, 然後對工作記憶體裡的變數進行處理, 處理完後將變數值更新到主記憶體。

那麼假如執行緒 A 和執行緒 B 同時處理一個共享變數, 會出現什麼情況?我們使用圖 2-5 所示 CPU 架構, 假設執行緒 A 和執行緒 B 使用不同 CPU 執行,並且當前兩級 Cache 都為空, 那麼這時候由於 Cache 的存在,將會導致記憶體不可見問題, 具體看下面的分析。

  1. 執行緒 A 首先獲取共享變數 X 的值,由於兩級 Cache 都沒有命中(沒找到變數X) ,所以載入主記憶體 中 X 的值,假如為 0。然後把 X=0的值快取到兩級快取, 執行緒 A 修改 X 的值為 1, 然後將其寫入兩級 Cache, 並且重新整理到主記憶體。 執行緒 A 操作完畢後,執行緒 A 所在的 CPU 的兩級 Cache 內和主記憶體裡面的 X 的值都是 l 。
  2. 執行緒 B 獲取 X 的值,首先一級快取沒有命中,然後看二級快取,二級快取命中了 , 所以返回 X= 1 ; 到這裡一切都是正常的, 因為這時候主記憶體中也是 X=l 。然後線 程 B 修改 X 的值為 2, 並將其存放到執行緒 2 所在的一級 Cache 和共享二級 Cache 中, 最後更新主記憶體中 X 的值為 2 ; 到這裡一切都是好的。
  3. 執行緒 A 這次又需要修改 X 的值, 獲取時一級快取命中, 並且 X=l ,到這裡問題就出現了,明明執行緒 B 已經把 X 的值修改為了 2,為何執行緒 A 獲取的還是 l 呢? 這就是共享變數的記憶體不可見問題, 也就是執行緒 B 寫入的值對執行緒 A 不可見。

那麼如何解決共享變數記憶體不可見問題? 使用 Java 中的 volatile 關鍵字或者synchronized關鍵字就可以解決這 個問題, 下面會有講解。

synchronized關鍵字

synchronized 塊是 Java 提供的一種原子性內建鎖, Java 中的每個物件都可以把它當作 一個同步鎖來使用 , 這些 Java 內建的使用者看不到的鎖被稱為內部鎖,也叫作監視器鎖。 執行緒的執行程式碼在進入 synchronized 程式碼塊前會自動獲取內部鎖,這時候其他執行緒訪問該同步程式碼塊時會被阻塞掛起。拿到內部鎖的執行緒會在正常退出同步程式碼塊或者丟擲異常後或者在同步塊內呼叫了該內建鎖資源的 wait 系列方法時釋放該內建鎖。 內建鎖是排它鎖, 也就是當一個執行緒獲取這個鎖後, 其他執行緒必須等待該執行緒釋放鎖後才能獲取該鎖。

另外,由於 Java 中的執行緒是與作業系統的原生執行緒一一對應的,所以當阻塞一個執行緒時,需要從使用者態切換到核心態執行阻塞操作,這是很耗時的操作,而 synchronized 的 使用就會導致上下文切換

前面介紹了共享變數記憶體可見性問題主要是由於執行緒的工作記憶體導致的,下面我們來講解 synchronized 的一個記憶體語義,這個記憶體語義就可以解決共享變數記憶體可見性問題。 進入 synchronized 塊的記憶體語義是把在 synchronized 塊內使用到的變數從執行緒的工作記憶體中清除,這樣在 synchronized 塊內使用到該變數時就不會從執行緒的工作記憶體中獲取,而是直接從主記憶體中獲取。 退出 synchronized 塊的記憶體語義是把在 synchronized 塊內對共享變數的修改重新整理到主記憶體。

其實這也是加鎖和釋放鎖的語義當獲取鎖後會清空鎖塊內本地記憶體中將會被用到的共享變數,在使用這些共享變數時從主記憶體進行載入,在釋放鎖時將本地記憶體中修改的共享變數重新整理到主記憶體。

除可以解決共享變數記憶體可見性問題外, synchronized 經常被用來實現原子性操作。 另外請注意, synchronized 關鍵字會引起執行緒上下文切換並帶來執行緒排程開銷。

總結:synchronized關鍵字可解決共享變數記憶體可見性問題、實現原子性操作,但是會引起執行緒上下文切換並帶來執行緒排程開銷。

volatile 關鍵字

上面介紹了使用鎖的方式可以解決共享變數記憶體可見性問題,但是使用鎖太笨重,因 為它會帶來執行緒上下文的切換開銷。 對於解決記憶體可見性問題, Java 還提供了一種弱形式的同步,也就是使用 volatile 關鍵字。 該關鍵字可以確保對一個變數的更新對其他執行緒馬上可見。 當一個變數被宣告為 volatile 時,執行緒在寫入變數時不會把值快取在暫存器或者其他地方,而是會把值重新整理回主記憶體。 當其他執行緒讀取該共享變數時,會從主記憶體重新獲取最新值,而不是使用當前執行緒的工作記憶體中的值。 volatile 的記憶體語義和 synchronized 有 相似之處,具體來說就是,當執行緒寫入了 volatile 變數值時就等價於執行緒退出 synchronized 同步塊(即把寫入工作記憶體的變數值同步到主記憶體),讀取 volatile 變數值時就相當於進入同 步塊 (即先清空本地記憶體變數值,再從主記憶體獲取最新值)。

下面看一個使用 volatile 關鍵字解決記憶體可見性問題的例子。 如下程式碼中的共享變數value 是執行緒不安全的,因為這裡沒有使用適當的同步措施。

首先來看使用 synchronized 關鍵宇進行同步的方式。

public class ThreadSafeinteger { 
	private int value; 
	public synchronized int get() { 
		return value; 
    }
	public synchronized void set (int value) { 
		this.value = value; 
    }
}

然後是使用 volatile 進行同步。

public class ThreadSafeinteger { 
	private volatile int value; 
	public int get(){
        return value; 
    } 
	publiC void set (int value) { 
        this.value = value; 
    }
}

在這裡使用 synchronized 和使用 volatile 是等價的,都解決了共享變數 value 的記憶體可見性問題,但是前者是獨佔鎖,同時只能有一個執行緒呼叫 get()方法,其他呼叫執行緒會被阻塞, 同時會存線上程上下文切換和執行緒重新排程的開銷,這也是使用鎖方式不好的地方。 而後者是非阻塞演算法, 不會造成執行緒上下文切換的開銷。

但並非在所有情況下使用它們都是等價的, volatile 雖然提供了可見性保證,但並不保證操作的原子性。那麼一般在什麼時候才使用 volatile 關鍵字呢?寫入變數值不依賴、變數的當前值時。 因為如果依賴當前值,將是獲取一計算一寫入 三步操作,這三步操作不是原子性的,而 volatile 不保證原子性。讀寫變數值時沒有加鎖。 因為加鎖本身已經保證了記憶體可見性,這時候不需要把變數宣告為 volatile 的。

Java 中的原子性操作

所謂原子性操作,是指執行一系列操作時,這些操作要麼全部執行, 要麼全部不執行, 不存在只執行其中一部分的情況

比如,在設計計數器時一般都先讀取當前值,然後+1, 再更新。 這個過程是讀改寫的過程,如果不能保證這個過程是原子性的,那麼就會出現執行緒安全問題。 如下程式碼是執行緒不安全的,因為不能保證++value 是原子性操作。

public class ThreadNotSaf eCount { 
	private Long value; 
	publiC Long getCount () {
        return value; 
	public void inc () { 
        ++value; 
    }
}

使用 Javap -c 命令檢視彙編程式碼,如下所示。

publ inC void inc() ; 
	Code:
	0: aload_0 
    1: dup 
    2: getfield #2  //Field value:J 
    5: lconst_1 
    6: ladd 
    7: putfield #2  //Field value:J
   10: return 

由此可見,簡單的++value 由 2、 5、 6、 7 四步組成,其中第 2 步是獲取當前 value 的 值並放入棧頂, 第 5 步把常量 1 放入棧頂,第 6 步把當前棧頂中兩個值相加並把結果放入 棧頂,第 7 步則把棧頂的結果賦給 value 變數。因此, Java 中簡單的一句++value 被轉換為彙編後就不具有原子性了 。

那麼如何才能保證多個操作的原子性呢?最簡單的方法就是使用 synchronized 關鍵字 進行同步,修改程式碼如下。

public class ThreadNotSaf eCount { 
	private Long value; 
	publiC synchronized Long getCount () {
        return value; 
	public synchronized void inc () { 
        ++value; 
    }
}

使用 synchronized 關鍵宇的確可以實現執行緒安全性, 即記憶體可見性和原子性,但是 synchronized 是獨佔鎖,沒有獲取內部鎖的執行緒會被阻塞掉,而這裡的 getCount 方法只是 讀操作,多個執行緒同時呼叫不會存線上程安全問題。 但是加了關鍵宇 synchronized 後,同 一時間就只能有一個執行緒可以呼叫,這顯然大大降低了併發性。 你也許會間,既然是只讀操作,那為何不去掉 getCount 方法上的 synchronized 關鍵字呢?其實是不能去掉的,別忘了這裡要靠 synchronized 來實現 value 的記憶體可見性。那麼有沒有更好的實現呢?答案是肯定的,下面將講到的在內部使用非阻塞 CAS 演算法實現的原子性操作類AtomicLong就是 一個不錯的選擇。

CAS(compare and swap)

CAS介紹

在 Java 中 , 鎖在併發處理中佔據了 一席之地,但是使用鎖有一個不好的地方,就 是當一個執行緒沒有獲取到鎖時會被阻塞掛起, 這會導致執行緒上下文的切換和重新排程開銷。 Java 提供了非阻塞的 volatile 關鍵字來解決共享變數的可見性問題, 這在一定程度 上彌補 了 鎖帶來的開銷 問題,但是 volatile 只能保證共享變數的可見性,不能解決讀 改一寫等的原子性問題。

CAS 即 Compare and Swap,其是 JDK 提供的非阻塞原子性操 作, 它通過硬體保證了比較更新操作的原子性。 JDK 裡面的 Unsafe 類提供了一系列的 compareAndSwap*方法, 下面以 compareAndSwapLong 方法為例進行簡單介紹。

  • boolean compareAndSwapLong(Object obj,long valueOffset,long expect, long update)方 法 : 其中 compareAndSwap 的意思是比較並交換。 CAS 有四個運算元, 分別為 : 物件記憶體位置物件中的變數的偏移量變數預期值新的值。 其操作含義是, 如果物件 obj 中記憶體偏移量為 valueOffset 的變數值為 expect,則使用新的值 update 替換舊的值 expect。 這是處理器提供的一個原子性指令。

關於 CAS 操作有個經典的 ABA 問題, 具體如下: 假如執行緒 I 使用 CAS 修改初始值 為 A 的變數 X, 那麼執行緒 I 會首先去獲取當前變數 X 的值(為 A〕, 然後使用 CAS 操作嘗 試修改 X 的值為 B, 如果使用 CAS 操作成功了 , 那麼程式執行一定是正確的嗎?其實未必, 這是因為有可能線上程 I 獲取變數 X 的值 A 後,在執行 CAS 前,執行緒 II 使用 CAS 修改 了變數 X 的值為 B,然後又使用 CAS 修改了變數 X 的值為 A。 所以雖然執行緒 I 執行 CAS 時 X 的值是 A, 但是這個 A 己經不是執行緒 I 獲取時的 A 了 。 這就是 ABA 問題。

ABA 問題的產生是因為變數的狀態值產生了環形轉換,就是變數的值可以從 A 到 B, 然後再從 B 到 A。如果變數的值只能朝著一個方向轉換,比如 A 到 B , B 到 C, 不構成環 形,就不會存在問題。 JDK 中的 AtomicStampedReference 類給每個變數的狀態值都配備了 一個時間戳, 從而避免了 ABA 問題的產生。

CAS的強大之處

CAS相對於synchronized的優勢是:CAS是非阻塞的,速度更快。

下面我們來舉個例子感受一下CAS的速度。

例子的需求是:模擬100個使用者(執行緒),每個使用者訪問10次網站,然後統計所有使用者的訪問次數。

版本1:既不使用synchronized,也不使用CAS

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

class Test{
    static int count=0;
    public static void request() throws InterruptedException {
        //模擬耗時5毫秒
        TimeUnit.MILLISECONDS.sleep(5);
        count++;
    }

    public static void main(String[] args) throws InterruptedException {
        long startTime=System.currentTimeMillis();
        int threadSize=100;
        CountDownLatch countDownLatch=new CountDownLatch(threadSize);
        for(int i=0;i<threadSize;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        for(int j=0;j<10;j++){
                            request();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        countDownLatch.countDown();
                    }
                }
            }).start();
        }
        countDownLatch.await();
        long endTime=System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ",耗時:"+(endTime-startTime)+",count="+count);
    }
}

執行結果:

根據我們的例子需求,最終得到的訪問次數count應該為100*10=1000,但是從上面的執行結果可以看到,count的值是992,而且每一次執行的結果都可能不一樣,這個就是沒有實現原子性的後果。耗時為 66毫秒。

補充知識點:CountDownLatch

Java的concurrent包裡面的CountDownLatch其實可以把它看作一個計數器,只不過這個計數器的操作是原子操作,同時只能有一個執行緒去操作這個計數器,也就是同時只能有一個執行緒去減這個計數器裡面的值。

可以向CountDownLatch物件設定一個初始的數字作為計數值,任何呼叫這個物件上的await()方法的執行緒都會阻塞,直到這個計數器的計數值被其他的執行緒減為0為止。

CountDownLatch的一個非常典型的應用場景是:有一個任務想要往下執行,但必須要等到其他的任務執行完畢後才可以繼續往下執行。假如我們這個想要繼續往下執行的任務呼叫一個CountDownLatch物件的await()方法,其他的任務執行完自己的任務後呼叫同一個CountDownLatch物件上的countDown()方法,這個呼叫await()方法的任務將一直阻塞等待,直到這個CountDownLatch物件的計數值減到0為止。

舉個例子,有三個工人在為老闆幹活,這個老闆有一個習慣,就是當三個工人把一天的活都幹完了的時候,他就來檢查所有工人所幹的活。記住這個條件:三個工人先全部幹完活,老闆才檢查

版本2:通過synchronized原理來保證原子性

基於上面的程式碼,除了給request方法新增synchronized關鍵字,其它都不變。

    public synchronized static void request() throws InterruptedException {
        //模擬耗時5毫秒
        TimeUnit.MILLISECONDS.sleep(5);
        count++;
    }

執行結果:

結果實現了原子性,但是耗時很長,相對於不加鎖時,效率低了很多。這個就是synchronized的缺點。

版本3:模擬CAS原理來保證原子性

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

class Test{
    volatile static int count=0;
    public static int getCount(){return count;}

    public static void request() throws InterruptedException {
        //模擬耗時5毫秒
        TimeUnit.MILLISECONDS.sleep(5);
        int expectCount;
        while (!compareAndSwap(expectCount=getCount(),expectCount+1)){}
    }
    public static synchronized boolean compareAndSwap(int expectCount,int newCount){
        if(getCount()==expectCount){
            count=newCount;
            return true;
        }
        return false;
    }

    public static void main(String[] args) throws InterruptedException {
        long startTime=System.currentTimeMillis();
        int threadSize=100;
        CountDownLatch countDownLatch=new CountDownLatch(threadSize);
        for(int i=0;i<threadSize;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        for(int j=0;j<10;j++){
                            request();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        countDownLatch.countDown();
                    }
                }
            }).start();
        }
        countDownLatch.await();
        long endTime=System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ",耗時:"+(endTime-startTime)+",count="+count);
    }
}

遞增操作本質上可以分為3步:

  1. 獲取變數值,記為A
  2. 遞增值,B=A+1
  3. 儲存值,即將B賦給count

那CAS可以理解為升級了第3步,將第3步改為了一下4個小步

  1. 獲取鎖

  2. 獲取count最新值,記為LV

  3. 判斷LV是否等於A,如果相等,則將B的值賦給count,並返回true,否則返回false

    若為false則重新回到上面的第1步:獲取變數值,然後第2步........

  4. 釋放鎖

程式碼的執行結果為:

可以看到,模擬CAS的實現原理保證了原子性,而且耗時很短。

ABA問題的程式碼演示

JUC包提供了一系列的原子性操作類,這些類都是使用非阻塞演算法CAS來實現的,相比使用鎖實現原子性操作,效能有很大提高。接下來,我們利用atomicInteger原子操作類來演示CAS的ABA問題。

import java.util.concurrent.atomic.AtomicInteger;

class Test{
    public  static AtomicInteger a=new AtomicInteger(1);
    public static void main(String[] args) {
        Thread main=new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("操作執行緒是:"+Thread.currentThread().getName()+",a的初始值:"+a.get());
                try {
                    int expectNum=a.get();
                    int newNum=expectNum+1;
                    Thread.sleep(1000); //讓主執行緒休眠1s,讓出CPU
                    boolean isCASSuccess=a.compareAndSet(expectNum,newNum);
                    System.out.println("操作執行緒是:"+Thread.currentThread().getName()+"CAS操作成功了嗎? "+isCASSuccess);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        },"主執行緒");
        Thread other=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(20);  //確保Thread-main執行緒優先執行
                    a.incrementAndGet();
                    System.out.println("操作執行緒是:"+Thread.currentThread().getName()+",進行了increment操作,a值變為:"+a.get());
                    a.decrementAndGet();
                    System.out.println("操作執行緒是:"+Thread.currentThread().getName()+",進行了decrement操作,a值變為:"+a.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"干擾執行緒");
        main.start();
        other.start();
    }
}

上面程式碼,other執行緒剛開始執行就新增睡眠時間,保證讓main執行緒先執行,然後main執行緒獲取完expectNum後,也休眠,而且休眠時間更長,這時候other執行緒已經休眠完成,繼續執行,修改a的值,加1再減1。然後,main執行緒休眠完成,繼續執行(完全不知道a的值被改過了)。

執行結果如下:

總結:對於ABA問題不敏感的資料或程式,建議使用CAS來保證原子性,但是如果資料對ABA問題敏感,比如說涉及到錢或者銀行相關業務的,建議不要使用CAS,可以使用鎖或者其它方式。

JDK中還提供了AtomicStampedReference類來避免ABA問題的產生,這個類可以給每個變數的值配備一個時間戳(版本號)。程式碼演示:

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;

class Test{
    public  static AtomicStampedReference<Integer> a=new AtomicStampedReference<Integer>(1,1);
    public static void main(String[] args) {
        Thread main=new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("操作執行緒是:"+Thread.currentThread().getName()+",a的初始值:"+a.getReference());
                try {
                    Integer expectReference=a.getReference();
                    Integer newReference=expectReference+1;
                    Integer expectStamp=a.getStamp();
                    Integer newStamp=expectStamp+1;
                    Thread.sleep(1000); //讓主執行緒休眠1s,讓出CPU
                    boolean isCASSuccess=a.compareAndSet(expectReference,newReference,expectStamp,newStamp);
                    System.out.println("操作執行緒是:"+Thread.currentThread().getName()+"CAS操作成功了嗎? "+isCASSuccess);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        },"主執行緒");
        Thread other=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(20);  //確保Thread-main執行緒優先執行
                    a.compareAndSet(a.getReference(),(a.getReference()+1),a.getStamp(),(a.getStamp()+1));
                    System.out.println("操作執行緒是:"+Thread.currentThread().getName()+",進行了increment操作,a值變為:"+a.getReference());
                    a.compareAndSet(a.getReference(),(a.getReference()-1),a.getStamp(),(a.getStamp()+1));
                    System.out.println("操作執行緒是:"+Thread.currentThread().getName()+",進行了decrement操作,a值變為:"+a.getReference());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"干擾執行緒");
        main.start();
        other.start();
    }
}

執行結果為:

可以看到,AtomicStampedReference類解決了ABA問題。

CAS的其它缺點

CAS雖然很高效的解決了原子操作問題,但是CAS仍然存在三大問題。

  1. 迴圈時間長開銷很大。
  2. 只能保證一個共享變數的原子操作。
  3. ABA問題。(前面講了)

+迴圈時間長開銷很大:

我們可以看到getAndAddInt方法執行時,如果CAS失敗,會一直進行嘗試。如果CAS長時間一直不成功,可能會給CPU帶來很大的開銷。

只能保證一個共享變數的原子操作:

當對一個共享變數執行操作時,我們可以使用迴圈CAS的方式來保證原子操作,但是對多個共享變數操作時,迴圈CAS就無法保證操作的原子性,這個時候就可以用鎖來保證原子性。

不同型別的鎖

樂觀鎖與悲觀鎖

樂觀鎖和悲觀鎖是在資料庫中引入的名詞,但是在併發包鎖裡面也引入了類似的思想, 所以這裡還是有必要講解下。

悲觀鎖指對資料被外界修改持保守態度,認為資料很容易就會被其他執行緒修改,所以在資料被處理前先對資料進行加鎖,並在整個資料處理過程中,使資料處於鎖定狀態。 悲觀鎖的實現往往依靠資料庫提供的鎖機制,即在資料庫中 ,在對資料記錄操作前給記錄加排它鎖。 如果獲取鎖失敗, 則說明資料正在被其他執行緒修改, 當前執行緒則等待或者丟擲異 常。 如果獲取鎖成功,則對記錄進行操作,然後提交事務後釋放排它鎖。

樂觀鎖是相對悲觀鎖來說的,它認為資料在一般情況下不會造成衝突,所以在訪問記錄前不會加排它鎖,而是在進行資料提交更新時,才會正式對資料衝突與否進行檢測。具體來說,根據 update 返回的行數讓使用者決定如何去做

公平鎖與非公平鎖

根據執行緒獲取鎖的搶佔機制,鎖可以分為公平鎖和非公平鎖,公平鎖表示執行緒獲取鎖的順序是按照執行緒請求鎖的時間早晚來決定的,也就是最早請求鎖的執行緒將最早獲取到鎖。 而非公平鎖則在執行時闖入,也就是先來不一定先得

ReentrantLock 提供了公平和非公平鎖的實現。

  • 公平鎖: ReentrantLock pairLock =new ReentrantLock(true)。
  • 非公平鎖: ReentrantLock pairLock =new ReentrantLock(false)。

如果建構函式不傳遞引數,則預設是非公平鎖。

例如,假設執行緒 A 已經持有了鎖,這時候執行緒 B 請求該鎖其將會被掛起。 當執行緒 A 釋放鎖後,假如當前有執行緒 C 也需要獲取該鎖,如果採用非公平鎖方式,則根據執行緒排程 策略, 執行緒 B 和執行緒 C 兩者之一可能獲取鎖,這時候不需要任何其他干涉,而如果使用公平鎖則需要把 C 掛起,讓B 獲取當前鎖。

在沒有公平性需求的前提下儘量使用非公平鎖,因為公平鎖會帶來效能開銷。

獨佔鎖與共享鎖

根據鎖只能被單個執行緒持有還是能被多個執行緒共同持有,鎖可以分為獨佔鎖和共享鎖。

獨佔鎖保證任何時候都只有一個執行緒能得到鎖ReentrantLock 就是以獨佔方式實現的。 共享鎖則可以同時由多個執行緒持有,例如ReadWriteLock 讀寫鎖,它允許一個資源可以被多執行緒同時進行讀操作。

獨佔鎖是一種悲觀鎖,由於每次訪問資源都先加上互斥鎖,這限制了併發性,因為讀操作並不會影響資料的一致性,而獨佔鎖只允許在同一時間由一個執行緒讀取資料,其他線 程必須等待當前執行緒釋放鎖才能進行讀取。

共享鎖則是一種樂觀鎖,它放寬了加鎖的條件,允許多個執行緒同時進行讀操作

可重入鎖

當一個執行緒要獲取一個被其他執行緒持有的獨佔鎖時,該執行緒會被阻塞,那麼當一個執行緒再次獲取它自己己經獲取的鎖時是否會被阻塞呢?如果不被阻塞,那麼我們說該鎖是可重入的,也就是隻要該執行緒獲取了該鎖,那麼可以無限次數(在高階篇中我們將知道,嚴 格來說是有限次數)地進入被該鎖鎖住的程式碼。

下面看一個例子,看看在什麼情況下會使用可重入鎖。

synchronized(myObject){
    //一堆程式碼
    synchronized(myObjet){ 
        //一堆程式碼
    }
}

實際上, synchronized 內部鎖和Reetrantlock都是可重入鎖可重入鎖的原理是在鎖內部維護一個執行緒標示monitor,用來標示該鎖目前被哪個執行緒佔用,然後關聯一個計數器。一開始計數器值為0, 說明該鎖沒有被任何執行緒佔用 。 當一個錢程獲取了該鎖時,計數器的值會+1 ,這時其他執行緒再來獲取該鎖時會發現鎖的所有者不是自己而被阻塞掛起。

但是當獲取了該鎖的執行緒再次獲取鎖時發現鎖擁有者是自己,就會把計數器值加+1, 當釋放鎖後計數器值 - 1 。 當計數器值為 0 時,鎖裡面的執行緒標示被重置為 null, 這時候被阻塞的執行緒會被喚醒來競爭獲取該鎖。

自旋鎖

由於 Java 中的執行緒是與作業系統中的執行緒一一對應的,所以當一個執行緒在獲取鎖(比 如獨佔鎖)失敗後,會被切換到核心狀態而被掛起。 當該執行緒獲取到鎖時又需要將其切換 到核心狀態而喚醒該執行緒。 而從使用者狀態切換到核心狀態的開銷是比較大的,在一定程度 上會影響併發效能。自旋鎖則是,當前執行緒在獲取鎖時,如果發現鎖已經被其他執行緒佔有, 它不馬上阻塞自己,在不放棄 CPU 使用權的情況下,多次嘗試獲取(預設次數是 10,可 以使用 -XX:PreBlockSpinsh 引數設定該值),很有可能在後面幾次嘗試中其他執行緒己經釋放了鎖。 如果嘗試指定的次數後仍沒有獲取到鎖則當前執行緒才會被阻塞掛起。 由此看來自 旋鎖是使用 CPU 時間換取執行緒阻塞與排程的開銷,但是很有可能這些 CPU 時間白白浪費了 。

補充:

ReentrantLock常常對比著synchronized來分析,我們先對比著來看然後再一點一點分析。

(1)synchronized是獨佔鎖,加鎖和解鎖的過程自動進行,易於操作,但不夠靈活。ReentrantLock也是獨佔鎖,加鎖和解鎖的過程需要手動進行,不易操作,但非常靈活。

(2)synchronized可重入,因為加鎖和解鎖自動進行,不必擔心最後是否釋放鎖;ReentrantLock也可重入,但加鎖和解鎖需要手動進行,且次數需一樣,否則其他執行緒無法獲得鎖。

(3)synchronized不可響應中斷,一個執行緒獲取不到鎖就一直等著;ReentrantLock可以相應中斷。

原子操作類TODO

JUC 包提供了一系列的原子性操作類,這些類都是使用非阻塞演算法 CAS 實現的,相 比使用鎖實現原子性操作這在效能上有很大提高。由於原子性操作類的原理都大致相同, 所以本章只講解最簡單的 AtomicLong 類的實現原理以及 JDK 8 中新增的 LongAdder 和 LongAccumulator 類的原理。有了這些基礎, 再去理解其他原子性操作類的實現就不會感到困難了 。

原子變數操作類:

JUC 併發包中包含有 Atomiclnteger、 AtomicLong 和 AtomicBoolean 等原子性操作類, 它們的原理類似。

JDK 8 新增的原子操作類 LongAdder的簡單介紹:

前面講過, AtomicLong 通過 CAS 提供了非阻塞的原子性操作,相比使用阻塞演算法的 同步器來說它的效能己經很好了,但是 JDK 開發組並不滿足於此。 使用 AtomicLong 時, 在高併發下大量執行緒會同時去競爭更新同一個原子變數,但是由於同時只有一個執行緒的 CAS 操作會成功,這就造成了大量執行緒競爭失敗後,會通過無限迴圈不斷進行自旋嘗試 CAS 的操作, 而這會白白浪費 CPU 資源。

因此 JDK 8 新增了一個原子性遞增或者遞減類 LongAdder 用來克服在高併發下使用 AtomicLong 的缺點

既然AtomicLong 的效能瓶頸是由於過多執行緒同時去競爭一個變數的更新而產生的,那麼如果把一個變數分解為多個變數,讓同樣多的執行緒去競爭多個資源, 是不是就解決了效能問題?是的, LongAdder 就是這個思路。 下面通過圖來理解兩者設計 的不同之處,如圖 4-1 所示。

如圖 4-1 所示,使用 AtomicLong 時,是多個執行緒同時競爭同一個原子變數。

如圖 4-2 所示,使用 LongAdder 時,則是在內部維護多個 Cell 變數,每個 Cell 裡面有一個初始值為 0 的 long 型變數,這樣,在同等併發量的情況下,爭奪單個變數更新操 作的執行緒量會減少,這變相地減少了爭奪共享資源的併發量。另外,多個執行緒在爭奪同一 個 Cell 原子變數時如果失敗了 , 它並不是在當前 Cell 變數上一直自旋 CAS 重試,而是嘗 試在其他 Cell 的變數上進行 CAS 嘗試,這個改變增加了當前執行緒重試 CAS 成功的可能性。 最後,在獲取 LongAdder 當前值時, 是把所有 Cell 變數的 value 值累加後再加上 base 返回的。

LongAdder 維護了 一個延遲初始化的原子性更新陣列(預設情況下 Cell 陣列是 null〕 和一個基值變數 base。 由於 Cells 佔用的記憶體是相對比較大的,所以一開始並不建立它, 而是在需要時建立,也就是惰性載入

當一開始判斷 Cell 陣列是 null 並且併發執行緒較少時,所有的累加操作都是對 base 變數進行的。 保持 Cell 陣列的大小為 2 的 N 次方,在初始化時 Cell 陣列中的 Cell 元素個數 為 2,數組裡面的變數實體是 Cell 型別。 Cell 型別是 AtomicLong 的一個改進,用來減少快取的爭用,也就是解決偽共享問題。

對於大多數孤立的多個原子操作進行位元組填充是浪費的,因為原子性操作都是無規律地分散在記憶體中的 (也就是說多個原子性變數的記憶體地址是不連續的), 多個原子變數被放入同一個快取行的可能性很小。 但是原子性陣列元素的記憶體地址是連續的,所以陣列內 的多個元素能經常共享快取行,因此這裡使用 @sun.misc.Contended 註解對 Cell 類進行位元組填充,這防止了陣列中多個元素共享一個快取行,在效能上是一個提升。

AQS

Java併發面試問題之談談你對AQS的理解?

一、ReentrantLock和AQS的關係

首先我們來看看,如果用java併發包下的ReentrantLock來加鎖和釋放鎖,是個什麼樣的感覺?

這個基本學過java的同學應該都會吧,畢竟這個是java併發基本API的使用,應該每個人都是學過的,所以我們直接看一下程式碼就好了:

上面那段程式碼應該不難理解吧,無非就是搞一個Lock物件,然後加鎖和釋放鎖。

你這時可能會問,這個跟AQS有啥關係?關係大了去了!因為java併發包下很多API都是基於AQS來實現的加鎖和釋放鎖等功能的,AQS是java併發包的基礎類。

舉個例子,比如說ReentrantLock、ReentrantReadWriteLock底層都是基於AQS來實現的。

那麼AQS的全稱是什麼呢?AbstractQueuedSynchronizer,抽象佇列同步器。給大家畫一個圖先,看一下ReentrantLock和AQS之間的關係。

我們來看上面的圖。說白了,ReentrantLock內部包含了一個AQS物件,也就是AbstractQueuedSynchronizer型別的物件。這個AQS物件就是ReentrantLock可以實現加鎖和釋放鎖的關鍵性的核心元件。

二、ReentrantLock加鎖和釋放鎖的底層原理

好了,那麼現在如果有一個執行緒過來嘗試用ReentrantLock的lock()方法進行加鎖,會發生什麼事情呢?

很簡單,這個AQS物件內部有一個核心的變數叫做state,是int型別的,代表了加鎖的狀態。初始狀態下,這個state的值是0。

另外,這個AQS內部還有一個關鍵變數,用來記錄當前加鎖的是哪個執行緒,初始化狀態下,這個變數是null。

接著執行緒1跑過來呼叫ReentrantLock的lock()方法嘗試進行加鎖,這個加鎖的過程,直接就是用CAS操作將state值從0變為1。

如果不知道CAS是啥的,請看上篇文章,《大白話聊聊Java併發面試問題之Java 8如何優化CAS效能?》

如果之前沒人加過鎖,那麼state的值肯定是0,此時執行緒1就可以加鎖成功。

一旦執行緒1加鎖成功了之後,就可以設定當前加鎖執行緒是自己。所以大家看下面的圖,就是執行緒1跑過來加鎖的一個過程。

其實看到這兒,大家應該對所謂的AQS有感覺了。說白了,就是併發包裡的一個核心元件,裡面有state變數、加鎖執行緒變數等核心的東西,維護了加鎖狀態。

你會發現,ReentrantLock這種東西只是一個外層的API,核心中的鎖機制實現都是依賴AQS元件的

這個ReentrantLock之所以用Reentrant打頭,意思就是他是一個可重入鎖。

可重入鎖的意思,就是你可以對一個ReentrantLock物件多次執行lock()加鎖和unlock()釋放鎖,也就是可以對一個鎖加多次,叫做可重入加鎖。

大家看明白了那個state變數之後,就知道了如何進行可重入加鎖!

其實每次執行緒1可重入加鎖一次,會判斷一下當前加鎖執行緒就是自己,那麼他自己就可以可重入多次加鎖,每次加鎖就是把state的值給累加1,別的沒啥變化。

接著,如果執行緒1加鎖了之後,執行緒2跑過來加鎖會怎麼樣呢?

我們來看看鎖的互斥是如何實現的?執行緒2跑過來一下看到,哎呀!state的值不是0啊?所以CAS操作將state從0變為1的過程會失敗,因為state的值當前為1,說明已經有人加鎖了!

接著執行緒2會看一下,是不是自己之前加的鎖啊?當然不是了,“加鎖執行緒”這個變數明確記錄了是執行緒1佔用了這個鎖,所以執行緒2此時就是加鎖失敗。

給大家來一張圖,一起來感受一下這個過程:

接著,執行緒2會將自己放入AQS中的一個等待佇列,因為自己嘗試加鎖失敗了,此時就要將自己放入佇列中來等待,等待執行緒1釋放鎖之後,自己就可以重新嘗試加鎖了

所以大家可以看到,AQS是如此的核心!AQS內部還有一個等待佇列,專門放那些加鎖失敗的執行緒!

同樣,給大家來一張圖,一起感受一下:

接著,執行緒1在執行完自己的業務邏輯程式碼之後,就會釋放鎖!他釋放鎖的過程非常的簡單,就是將AQS內的state變數的值遞減1,如果state值為0,則徹底釋放鎖,會將“加鎖執行緒”變數也設定為null!

整個過程,參見下圖:

接下來,會從等待佇列的隊頭喚醒執行緒2重新嘗試加鎖。

好!執行緒2現在就重新嘗試加鎖,這時還是用CAS操作將state從0變為1,此時就會成功,成功之後代表加鎖成功,就會將state設定為1。

此外,還要把“加鎖執行緒”設定為執行緒2自己,同時執行緒2自己就從等待佇列中出隊了。

最後再來一張圖,大家來看看這個過程。

AQS的深入學習

AQS是什麼?

AQS是鎖的底層支援,AbstractQueuedSynchronizer 抽象同步佇列簡稱 AQS,它是實現同步器的基礎元件, 併發包中鎖的底層就是使用 AQS 實現的。 另外,大多數開發者可能永遠不會直接使用 AQS,但是知道其原理對於架構設計還是很有幫助的。

AQS實現原理

AQS中 維護了一個volatile int state(代表共享資源)和一個FIFO執行緒等待佇列(多執行緒爭用資源被阻塞時會進入此佇列)。

這裡volatile能夠保證多執行緒下的可見性,當state=1則代表當前物件鎖已經被佔有,其他執行緒來加鎖時則會失敗,加鎖失敗的執行緒會被放入一個FIFO的等待佇列中,比列會被UNSAFE.park()操作掛起,等待其他獲取鎖的執行緒釋放鎖才能夠被喚醒。

另外state的操作都是通過CAS來保證其併發修改的安全性。

具體原理我們可以用一張圖來簡單概括:

AQS 中提供了很多關於鎖的實現方法,

  • getState():獲取鎖的標誌state值
  • setState():設定鎖的標誌state值
  • tryAcquire(int):獨佔方式獲取鎖。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式釋放鎖。嘗試釋放資源,成功則返回true,失敗則返回false。

這裡還有一些方法並沒有列出來,接下來我們以ReentrantLock作為突破點通過原始碼和畫圖的形式一步步瞭解AQS內部實現原理。

目錄結構

文章準備模擬多執行緒競爭鎖、釋放鎖的場景來進行分析AQS原始碼:

三個執行緒(執行緒一、執行緒二、執行緒三)同時來加鎖/釋放鎖

目錄如下:

  • 執行緒一加鎖成功時AQS內部實現
  • 執行緒二/三加鎖失敗時AQS中等待佇列的資料模型
  • 執行緒一釋放鎖及執行緒二獲取鎖實現原理
  • 通過執行緒場景來講解公平鎖具體實現原理
  • 通過執行緒場景來講解Condition中await()signal()實現原理

這裡會通過畫圖來分析每個執行緒加鎖、釋放鎖後AQS內部的資料結構和實現原理

執行緒一加鎖成功

如果同時有三個執行緒併發搶佔鎖,此時執行緒一搶佔鎖成功,執行緒二執行緒三搶佔鎖失敗,具體執行流程如下:

此時AQS內部資料為:

執行緒二執行緒三加鎖失敗:

有圖可以看出,等待佇列中的節點Node是一個雙向連結串列,這裡SIGNALNodewaitStatus屬性,Node中還有一個nextWaiter屬性,這個並未在圖中畫出來,這個到後面Condition會具體講解的。

waitStatus記錄當前執行緒等待狀態,可以為 CANCELLED (執行緒被取消了)、 SIGNAL (執行緒需要被喚醒)、 CONDITION (執行緒在條件佇列裡面等待〉、 PROPAGATE (釋 放共享資源時需要通知其他節點〕; prev 記錄當前節點的前驅節點, next 記錄當前節點的 後繼節點。

具體看下搶佔鎖程式碼實現:

java.util.concurrent.locks.ReentrantLock .NonfairSync:
static final class NonfairSync extends Sync {
    
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

這裡使用的ReentrantLock非公平鎖,執行緒進來直接利用CAS嘗試搶佔鎖,如果搶佔成功state值回被改為1,且設定物件獨佔鎖執行緒為當前執行緒。如下所示:

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

執行緒二搶佔鎖失敗

我們按照真實場景來分析,執行緒一搶佔鎖成功後,state變為1,執行緒二通過CAS修改state變數必然會失敗。此時AQSFIFO(First In First Out 先進先出)佇列中資料如圖所示:

我們將執行緒二執行的邏輯一步步拆解來看:

java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire():

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

先看看tryAcquire()的具體實現:java.util.concurrent.locks.ReentrantLock .nonfairTryAcquire():

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

nonfairTryAcquire()方法中首先會獲取state的值,如果不為0則說明當前物件的鎖已經被其他執行緒所佔有,接著判斷佔有鎖的執行緒是否為當前執行緒,如果是則累加state值,這就是可重入鎖的具體實現,累加state值,釋放鎖的時候也要依次遞減state值。

如果state為0,則執行CAS操作,嘗試更新state值為1,如果更新成功則代表當前執行緒加鎖成功。

執行緒二為例,因為執行緒一已經將state修改為1,所以執行緒二通過CAS修改state的值不會成功。加鎖失敗。

執行緒二執行tryAcquire()後會返回false,接著執行addWaiter(Node.EXCLUSIVE)邏輯,將自己加入到一個FIFO等待佇列中,程式碼實現如下:

java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter():

private Node addWaiter(Node mode) {    
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

這段程式碼首先會建立一個和當前執行緒繫結的Node節點,Node為雙向連結串列。此時等待對內中的tail指標為空,直接呼叫enq(node)方法將當前執行緒加入等待佇列尾部:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

第一遍迴圈時tail指標為空,進入if邏輯,使用CAS操作設定head指標,將head指向一個新建立的Node節點。此時AQS中資料:

執行完成之後,headtailt都指向第一個Node元素。

接著執行第二遍迴圈,進入else邏輯,此時已經有了head節點,這裡要操作的就是將執行緒二對應的Node節點掛到head節點後面。此時佇列中就有了兩個Node節點:

addWaiter()方法執行完後,會返回當前執行緒建立的節點資訊。繼續往後執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)邏輯,此時傳入的引數為執行緒二對應的Node節點資訊:

java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued():

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndChecknIterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

acquireQueued()這個方法會先判斷當前傳入的Node對應的前置節點是否為head,如果是則嘗試加鎖。加鎖成功過則將當前節點設定為head節點,然後空置之前的head節點,方便後續被垃圾回收掉。

如果加鎖失敗或者Node的前置節點不是head節點,就會通過shouldParkAfterFailedAcquire方法 將head節點的waitStatus變為了SIGNAL=-1,最後執行parkAndChecknIterrupt方法,呼叫LockSupport.park()掛起當前執行緒。

此時AQS中的資料如下圖:

此時執行緒二就靜靜的待在AQS的等待佇列裡面了,等著其他執行緒釋放鎖來喚醒它。

執行緒三搶佔鎖失敗

看完了執行緒二搶佔鎖失敗的分析,那麼再來分析執行緒三搶佔鎖失敗就很簡單了,先看看addWaiter(Node mode)方法:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

此時等待佇列的tail節點指向執行緒二,進入if邏輯後,通過CAS指令將tail節點重新指向執行緒三

接著執行緒三呼叫enq()方法執行入隊操作,和上面執行緒二執行方式是一致的,入隊後會修改執行緒二對應的Node中的waitStatus=SIGNAL。最後執行緒三也會被掛起。此時等待佇列的資料如圖:

執行緒一釋放鎖

現在來分析下釋放鎖的過程,首先是執行緒一釋放鎖,釋放鎖後會喚醒head節點的後置節點,也就是我們現在的執行緒二,具體操作流程如下:

執行完後等待佇列資料如下:

此時執行緒二已經被喚醒,繼續嘗試獲取鎖,如果獲取鎖失敗,則會繼續被掛起。如果獲取鎖成功,則AQS中資料如圖:

接著還是一步步拆解來看,先看看執行緒一釋放鎖的程式碼:

java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

這裡首先會執行tryRelease()方法,這個方法具體實現在ReentrantLock中,如果tryRelease執行成功,則繼續判斷head節點的waitStatus是否為0

前面我們已經看到過,headwaitStatueSIGNAL(-1),這裡就會執行unparkSuccessor()方法來喚醒head的後置節點,也就是我們上面圖中執行緒二對應的Node節點。

此時看ReentrantLock.tryRelease()中的具體實現:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

執行完ReentrantLock.tryRelease()後,state被設定成0,Lock物件的獨佔鎖被設定為null。此時看下AQS中的資料:

接著執行java.util.concurrent.locks.AbstractQueuedSynchronizer.unparkSuccessor()方法,喚醒head的後置節點:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

這裡主要是將head節點的waitStatus設定為0,然後解除head節點next的指向,使head節點空置,等待著被垃圾回收。

此時重新將head指標指向執行緒二對應的Node節點,且使用LockSupport.unpark方法來喚醒執行緒二

被喚醒的執行緒二會接著嘗試獲取鎖,用CAS指令修改state資料。執行完成後可以檢視AQS中資料:

此時執行緒二被喚醒,執行緒二接著之前被park的地方繼續執行,繼續執行acquireQueued()方法。

執行緒二喚醒繼續加鎖

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

此時執行緒二被喚醒,繼續執行for迴圈,判斷執行緒二的前置節點是否為head,如果是則繼續使用tryAcquire()方法來嘗試獲取鎖,其實就是使用CAS操作來修改state值,如果修改成功則代表獲取鎖成功。接著將執行緒二設定為head節點,然後空置之前的head節點資料,被空置的節點資料等著被垃圾回收

此時執行緒三獲取鎖成功,AQS中佇列資料如下:

等待佇列中的資料都等待著被垃圾回收。

執行緒二釋放鎖/執行緒三加鎖

執行緒二釋放鎖時,會喚醒被掛起的執行緒三,流程和上面大致相同,被喚醒的執行緒三會再次嘗試加鎖,具體程式碼可以參考上面內容。具體流程圖如下:

此時AQS中佇列資料如圖:

公平鎖實現原理

上面所有的加鎖場景都是基於非公平鎖來實現的,非公平鎖ReentrantLock的預設實現,那我們接著來看一下公平鎖的實現原理,這裡先用一張圖來解釋公平鎖非公平鎖的區別:

非公平鎖執行流程:

這裡我們還是用之前的執行緒模型來舉例子,當執行緒二釋放鎖的時候,喚醒被掛起的執行緒三執行緒三執行tryAcquire()方法使用CAS操作來嘗試修改state值,如果此時又來了一個執行緒四也來執行加鎖操作,同樣會執行tryAcquire()方法。

這種情況就會出現競爭,執行緒四如果獲取鎖成功,執行緒三仍然需要待在等待佇列中被掛起。這就是所謂的非公平鎖執行緒三辛辛苦苦排隊等到自己獲取鎖,卻眼巴巴的看到執行緒四插隊獲取到了鎖。

公平鎖執行流程:

公平鎖在加鎖的時候,會先判斷AQS等待佇列中是存在節點,如果存在節點則會直接入隊等待,具體程式碼如下.

公平鎖在獲取鎖是也是首先會執行acquire()方法,只不過公平鎖單獨實現了tryAcquire()方法:

#java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire():

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這裡會執行ReentrantLock中公平鎖的tryAcquire()方法

#java.util.concurrent.locks.ReentrantLock.FairSync.tryAcquire():

static final class FairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

這裡會先判斷state值,如果不為0且獲取鎖的執行緒不是當前執行緒,直接返回false代表獲取鎖失敗,被加入等待佇列。如果是當前執行緒則可重入獲取鎖。

如果state=0則代表此時沒有執行緒持有鎖,執行hasQueuedPredecessors()判斷AQS等待佇列中是否有元素存在,如果存在其他等待執行緒,那麼自己也會加入到等待佇列尾部,做到真正的先來後到,有序加鎖。具體程式碼如下:

#java.util.concurrent.locks.AbstractQueuedSynchronizer.hasQueuedPredecessors():

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

這段程式碼很有意思,返回false代表隊列中沒有節點或者僅有一個節點是當前執行緒建立的節點。返回true則代表隊列中存在等待節點,當前執行緒需要入隊等待。

先判斷head是否等於tail,如果佇列中只有一個Node節點,那麼head會等於tail,接著判斷head的後置節點,這裡肯定會是null,如果此Node節點對應的執行緒和當前的執行緒是同一個執行緒,那麼則會返回false,代表沒有等待節點或者等待節點就是當前執行緒建立的Node節點。此時當前執行緒會嘗試獲取鎖。

如果headtail不相等,說明佇列中有等待執行緒建立的節點,此時直接返回true,如果只有一個節點,而此節點的執行緒和當前執行緒不一致,也會返回true

非公平鎖公平鎖的區別:非公平鎖效能高於公平鎖效能。非公平鎖可以減少CPU喚醒執行緒的開銷,整體的吞吐效率會高點,CPU也不必取喚醒所有執行緒,會減少喚起執行緒的數量

非公平鎖效能雖然優於公平鎖,但是會存在導致執行緒飢餓的情況。在最壞的情況下,可能存在某個執行緒一直獲取不到鎖。不過相比效能而言,飢餓問題可以暫時忽略,這可能就是ReentrantLock預設建立非公平鎖的原因之一了。

Condition實現原理

Condition 簡介

上面已經介紹了AQS所提供的核心功能,當然它還有很多其他的特性,這裡我們來繼續說下Condition這個元件。

Condition`是在`java 1.5`中才出現的,它用來替代傳統的`Object`的`wait()`、`notify()`實現執行緒間的協作,相比使用`Object`的`wait()`、`notify()`,使用`Condition`中的`await()`、`signal()`這種方式實現執行緒間協作更加安全和高效。因此通常來說比較推薦使用`Condition

其中AbstractQueueSynchronizer中實現了Condition中的方法,主要對外提供awaite(Object.wait())signal(Object.notify())呼叫。

Condition Demo示例

使用示例程式碼:

/**
 * ReentrantLock 實現原始碼學習
 * @author 一枝花算不算浪漫
 * @date 2020/4/28 7:20
 */
public class ReentrantLockDemo {
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("執行緒一加鎖成功");
                System.out.println("執行緒一執行await被掛起");
                condition.await();
                System.out.println("執行緒一被喚醒成功");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("執行緒一釋放鎖成功");
            }
        }).start();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("執行緒二加鎖成功");
                condition.signal();
                System.out.println("執行緒二喚醒執行緒一");
            } finally {
                lock.unlock();
                System.out.println("執行緒二釋放鎖成功");
            }
        }).start();
    }
}

執行結果如下圖:

這裡執行緒一先獲取鎖,然後使用await()方法掛起當前執行緒並釋放鎖執行緒二獲取鎖後使用signal喚醒執行緒一

Condition實現原理圖解

我們還是用上面的demo作為例項,執行的流程如下:

執行緒一執行await()方法:

先看下具體的程式碼實現,#java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject.await()

 public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

await()方法中首先呼叫addConditionWaiter()將當前執行緒加入到Condition佇列中。

執行完後我們可以看下Condition佇列中的資料:

具體實現程式碼為:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

這裡會用當前執行緒建立一個Node節點,waitStatusCONDITION。接著會釋放該節點的鎖,呼叫之前解析過的release()方法,釋放鎖後此時會喚醒被掛起的執行緒二執行緒二會繼續嘗試獲取鎖。

接著呼叫isOnSyncQueue()方法判斷當前節點是否為Condition佇列中的頭部節點,如果是則呼叫LockSupport.park(this)掛起Condition中當前執行緒。此時執行緒一被掛起,執行緒二獲取鎖成功。

具體流程如下圖:

執行緒二執行signal()方法:

首先我們考慮下執行緒二已經獲取到鎖,此時AQS等待佇列中已經沒有了資料。

接著就來看看執行緒二喚醒執行緒一的具體執行流程:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

先判斷當前執行緒是否為獲取鎖的執行緒,如果不是則直接丟擲異常。接著呼叫doSignal()方法來喚醒執行緒。

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

這裡先從transferForSignal()方法來看,通過上面的分析我們知道Condition佇列中只有執行緒一建立的一個Node節點,且waitStatueCONDITION,先通過CAS修改當前節點waitStatus為0,然後執行enq()方法將當前執行緒加入到等待佇列中,並返回當前執行緒的前置節點。

加入等待佇列的程式碼在上面也已經分析過,此時等待佇列中資料如下圖:

接著開始通過CAS修改當前節點的前置節點waitStatusSIGNAL,並且喚醒當前執行緒。此時AQS中等待佇列資料為:

執行緒一被喚醒後,繼續執行await()方法中的 while 迴圈。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

因為此時執行緒一的waitStatus已經被修改為0,所以執行isOnSyncQueue()方法會返回false。跳出while迴圈。

接著執行acquireQueued()方法,這裡之前也有講過,嘗試重新獲取鎖,如果獲取鎖失敗繼續會被掛起。直到另外執行緒釋放鎖才被喚醒。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

此時執行緒一的流程都已經分析完了,等執行緒二釋放鎖後,執行緒一會繼續重試獲取鎖,流程到此終結。

Condition總結

我們總結下 Condition 和 wait/notify 的比較:

  • Condition 可以精準的對多個不同條件進行控制,wait/notify 只能和 synchronized 關鍵字一起使用,並且只能喚醒一個或者全部的等待佇列;
  • Condition 需要使用 Lock 進行控制,使用的時候要注意 lock() 後及時的 unlock(),Condition 有類似於 await 的機制,因此不會產生加鎖方式而產生的死鎖出現,同時底層實現的是 park/unpark 的機制,因此也不會產生先喚醒再掛起的死鎖,一句話就是不會產生死鎖,但是 wait/notify 會產生先喚醒再掛起的死鎖。

執行緒池

一、執行緒概述

執行緒池,顧名思義就是存放執行緒的池子,池子裡存放了很多可以複用的執行緒

如果不用類似執行緒池的容器,每當我們需要執行使用者任務的時候都去建立新的執行緒,任務執行完之後執行緒就被回收了,這樣頻繁地建立和銷燬執行緒會浪費大量的系統資源。

因此,執行緒池通過執行緒複用機制,並對執行緒進行統一管理,具有以下優點:

  • 降低系統資源消耗。通過複用已存在的執行緒,降低執行緒建立和銷燬造成的消耗;
  • 提高響應速度。當有任務到達時,無需等待新執行緒的建立便能立即執行;
  • 提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗大量系統資源,還會降低系統的穩定性,使用執行緒池可以進行對執行緒進行統一的分配、調優和監控。

ThreadPoolExecutor是執行緒池框架的一個核心類,另外, 執行緒池也提供了許多可調引數和可擴充套件性介面 ,以滿足不同情境的需要,程式 員可以使用更方便的 Executors 的工廠方法, 比如 newCachedThreadPool (執行緒池執行緒個數 最多可達 Integer.MAX_VALUE,執行緒自動回收)、 newFixedThreadPool (固定大小的執行緒池) 和 newSingleThreadExecutor (單個執行緒)等來建立執行緒池,當然使用者還可以 自定義。

本文通過對ThreadPoolExecutor原始碼的分析(基於JDK 1.8),來深入分析執行緒池的實現原理。

二、ThreadPoolExecutor類的屬性

ThreadPoolExecutor繼承了AbstractExecutorService,成員變數ctl是一個Integer的原子變數,用來記錄執行緒池狀態和執行緒池中執行緒個數。

檢視ThreadPoolExecutor類的原始碼:

// 執行緒池的控制狀態,用高3位來表示執行緒池的執行狀態,低29位來表示執行緒池中工作執行緒的數量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //值為29,用來表示偏移量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //執行緒池的最大容量,其值的二進位制為:00011111111111111111111111111111(29個1)
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;  //1的二進位制左移29位,低位補0

    // 執行緒池的執行狀態,總共有5個狀態,用高3位來表示
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    //任務快取佇列,用來存放等待執行的任務
    private final BlockingQueue<Runnable> workQueue;

    //全域性鎖,對執行緒池狀態等屬性修改時需要使用這個鎖
    private final ReentrantLock mainLock = new ReentrantLock();

    //執行緒池中工作執行緒的集合,訪問和修改需要持有全域性鎖
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // 終止條件
    private final Condition termination = mainLock.newCondition();

    //執行緒池中曾經出現過的最大執行緒數
    private int largestPoolSize;
    
    //已完成任務的數量
    private long completedTaskCount;

    //執行緒工廠
    private volatile ThreadFactory threadFactory;

    //任務拒絕策略
    private volatile RejectedExecutionHandler handler;

    //執行緒存活時間
    private volatile long keepAliveTime;

    //是否允許核心執行緒超時
    private volatile boolean allowCoreThreadTimeOut;

    //核心池大小,若allowCoreThreadTimeOut被設定,核心執行緒全部空閒超時被回收的情況下會為0
    private volatile int corePoolSize;

    //最大池大小,不得超過CAPACITY
    private volatile int maximumPoolSize;

    //預設的任務拒絕策略
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

    private final AccessControlContext acc;

在ThreadPoolExecutor類的這些屬性中,執行緒池狀態是控制執行緒池生命週期至關重要的屬性,這裡就以執行緒池狀態為出發點進行研究。

通過上面的原始碼可知,執行緒池的執行狀態總共有5種,其值和含義分別如下:

  • RUNNING: 高3位為111,接受新任務並處理阻塞佇列中的任務
  • SHUTDOWN: 高3位為000,不接受新任務但會處理阻塞佇列中的任務
  • STOP:高3位為001,不會接受新任務,也不會處理阻塞佇列中的任務,並且中斷正在執行的任務
  • TIDYING: 高3位為010,所有任務都已終止,工作執行緒數量為0,執行緒池將轉化到TIDYING狀態,即將要執行terminated()鉤子方法
  • TERMINATED: 高3位為011,terminated()方法已經執行結束

然而,執行緒池中使用一個AtomicInteger型別的變數ctl來表示執行緒池的控制狀態,其將執行緒池執行狀態與工作執行緒的數量打包在一個整型中,用高3位來表示執行緒池的執行狀態,低29位來表示執行緒池中工作執行緒的數量,對ctl的操作主要參考以下幾個函式:

// 通過與的方式,獲取ctl的高3位,也就是執行緒池的執行狀態
    private static int runStateOf(int c)     { return c & ~CAPACITY; }   //~ 是位運算子 非
    //通過與的方式,獲取ctl的低29位,也就是執行緒池中工作執行緒的數量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //通過或的方式,將執行緒池狀態和執行緒池中工作執行緒的數量打包成ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    //SHUTDOWN狀態的值是0,比它大的均是執行緒池停止或清理狀態,比它小的是執行狀態
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

接下來,我們看一下執行緒池狀態的所有轉換情況,如下:

  • RUNNING -> SHUTDOWN:呼叫shutdown(),可能在finalize()中隱式呼叫
  • (RUNNING or SHUTDOWN) -> STOP:呼叫shutdownNow()
  • SHUTDOWN -> TIDYING:當快取佇列和執行緒池都為空時
  • STOP -> TIDYING:當執行緒池為空時
  • TIDYING -> TERMINATED:當terminated()方法執行結束時

通常情況下,執行緒池有如下兩種狀態轉換流程:

  1. RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED
  2. RUNNING -> STOP -> TIDYING -> TERMINATED

三、ThreadPoolExecutor類的構造方法

通常情況下,我們使用執行緒池的方式就是new一個ThreadPoolExecutor物件來生成一個執行緒池。接下來,先看ThreadPoolExecutor類的建構函式:

//間接呼叫最後一個建構函式,採用預設的任務拒絕策略AbortPolicy和預設的執行緒工廠
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue);
    //間接呼叫最後一個建構函式,採用預設的任務拒絕策略AbortPolicy
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory);
    //間接呼叫最後一個建構函式,採用預設的預設的執行緒工廠
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler);
    //前面三個分別呼叫了最後一個,主要的建構函式
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);

接下來,看下最後一個建構函式的具體實現:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        //引數合法性校驗
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        //引數合法性校驗
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        //初始化對應的屬性
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

下面解釋下一下構造器中各個引數的含義:

1.corePoolSize

執行緒池中的核心執行緒數。當提交一個任務時,執行緒池建立一個新執行緒執行任務,直到當前執行緒數等於corePoolSize;如果當前執行緒數為corePoolSize,繼續提交的任務被儲存到阻塞佇列中,等待被執行。

2.maximumPoolSize

執行緒池中允許的最大執行緒數。如果當前阻塞佇列滿了,且繼續提交任務,則建立新的執行緒執行任務,前提是當前執行緒數小於maximumPoolSize。

3.keepAliveTime

執行緒空閒時的存活時間。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0。

4.unit

keepAliveTime引數的時間單位

5.workQueue

任務快取佇列(阻塞佇列),用來存放等待執行的任務。如果當前執行緒數為corePoolSize,繼續提交的任務就會被儲存到任務快取佇列中,等待被執行。

一般來說,這裡的BlockingQueue有以下三種選擇:

  • SynchronousQueue:一個不儲存元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態。因此,如果執行緒池中始終沒有空閒執行緒(任務提交的平均速度快於被處理的速度),可能出現無限制的執行緒增長。
  • LinkedBlockingQueue:基於連結串列結構的阻塞佇列,如果不設定初始化容量,其容量為Integer.MAX_VALUE,即為無界佇列。因此,如果執行緒池中執行緒數達到了corePoolSize,且始終沒有空閒執行緒(任務提交的平均速度快於被處理的速度),任務快取佇列可能出現無限制的增長。
  • ArrayBlockingQueue:基於陣列結構的有界阻塞佇列,按FIFO排序任務。

6.threadFactory

執行緒工廠,建立新執行緒時使用的執行緒工廠

7.handler

任務拒絕策略,當阻塞佇列滿了,且執行緒池中的執行緒數達到maximumPoolSize,如果繼續提交任務,就會採取任務拒絕策略處理該任務,執行緒池提供了4種任務拒絕策略:

  • AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常,預設策略;

  • CallerRunsPolicy:由呼叫execute方法的執行緒執行該任務;

  • DiscardPolicy:丟棄任務,但是不丟擲異常;

  • DiscardOldestPolicy:丟棄阻塞佇列最前面的任務,然後重新嘗試執行任務(重複此過程)。

    當然也可以根據應用場景實現RejectedExecutionHandler介面,自定義飽和策略,如記錄日誌或持久化儲存不能處理的任務。

四、執行緒池的實現原理

1.提交任務

執行緒池框架提供了兩種方式提交任務,submit()execute(),通過submit()方法提交的任務可以返回任務執行的結果,通過execute()方法提交的任務不能獲取任務執行的結果。

submit()方法的實現有以下三種:

public Future<?> submit(Runnable task);
    public <T> Future<T> submit(Runnable task, T result);
    public <T> Future<T> submit(Callable<T> task);

下面以第一個方法為例簡單看一下submit()方法的實現:

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

submit()方法是在ThreadPoolExecutor的父類AbstractExecutorService類實現的,最終還是呼叫的ThreadPoolExecutor類的execute()方法,下面著重看一下execute()方法的實現。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //獲取執行緒池控制狀態+執行緒個數變數的組合值
        int c = ctl.get();
        // (1)
        //當前執行緒池的執行緒(worker)數量是否小於corePoolSize 小於則開啟新執行緒執行
        if (workerCountOf(c) < corePoolSize) {
            //建立worker,addWorker方法boolean引數用來判斷是否建立核心執行緒
            if (addWorker(command, true))
                //成功則返回
                return;
            //失敗則再次獲取執行緒池控制狀態
            c = ctl.get();
        }
        //(2)
       //執行緒池處於RUNNING狀態,將任務加入workQueue任務快取佇列(阻塞佇列)
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次檢查,獲取執行緒池控制狀態,防止在任務入隊的過程中執行緒池關閉了或者執行緒池中沒有執行緒了
            int recheck = ctl.get();
            //執行緒池不處於RUNNING狀態,且將任務從workQueue移除成功,並執行拒絕策略
            if (! isRunning(recheck) && remove(command))
                //採取任務拒絕策略
                reject(command);
            //worker數量等於0,即執行緒池為空,則新增一個執行緒
            else if (workerCountOf(recheck) == 0)
                //建立worker
                addWorker(null, false);
        }
        //(3)  如果佇列滿了,則新增執行緒,若新增失敗則執行拒絕策略
        else if (!addWorker(command, false))  //建立worker
            reject(command);  //如果建立worker失敗,採取任務拒絕策略
    }

execute()方法的執行流程可以總結如下:

  • 若執行緒池工作執行緒數量小於corePoolSize,則建立新執行緒來執行任務
  • 若工作執行緒數量大於或等於corePoolSize,則將任務加入BlockingQueue
  • 若無法將任務加入BlockingQueue(BlockingQueue已滿),且工作執行緒數量小於maximumPoolSize,則建立新的執行緒來執行任務
  • 若工作執行緒數量達到maximumPoolSize,則建立執行緒失敗,採取任務拒絕策略

可以結合下面的兩張圖來理解執行緒池提交任務的執行流程。(ThreadPoolExecutorexecute()方法的執行示意圖)

2.建立執行緒

execute()方法的實現可以看出,addWorker()方法主要負責建立新的執行緒並執行任務,程式碼實現如下:

//addWorker有兩個引數:Runnable型別的firstTask,用於指定新增的執行緒執行的第一個任務;boolean型別的core,表示是否建立核心執行緒
//該方法的返回值代表是否成功新增一個執行緒
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // (1)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //執行緒數超標,不能再建立執行緒,直接返回
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS操作遞增workCount
                //如果成功,那麼建立執行緒前的所有條件校驗都滿足了,準備建立執行緒執行任務,退出retry迴圈
                //如果失敗,說明有其他執行緒也在嘗試往執行緒池中建立執行緒(往執行緒池提交任務可以是併發的),則繼續往下執行
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //重新獲取執行緒池控制狀態
                c = ctl.get();
                // 如果執行緒池的狀態發生了變更,如有其他執行緒關閉了這個執行緒池,那麼需要回到外層的for迴圈
                if (runStateOf(c) != rs)
                    continue retry;
                //如果只是CAS操作失敗的話,進入內層的for迴圈就可以了
            }
        }

        //到這裡,建立執行緒前的所有條件校驗都滿足了,可以開始建立執行緒來執行任務
        //worker是否已經啟動
        boolean workerStarted = false;
        //是否已將這個worker新增到workers這個HashSet中
        boolean workerAdded = false;
        Worker w = null;
        try {
            //建立一個worker,從這裡可以看出對執行緒的包裝
            w = new Worker(firstTask);
            //取出worker中的執行緒物件,Worker的構造方法會呼叫ThreadFactory來建立一個新的執行緒
            final Thread t = w.thread;
            if (t != null) {
                //獲取全域性鎖, 併發的訪問執行緒池workers物件必須加鎖,持有鎖的期間執行緒池也不會被關閉
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //重新獲取執行緒池的執行狀態
                    int rs = runStateOf(ctl.get());

                    //小於SHUTTDOWN即RUNNING
                    //等於SHUTDOWN並且firstTask為null,不接受新的任務,但是會繼續執行等待佇列中的任務
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //worker裡面的thread不能是已啟動的
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                       //將新建立的執行緒加入到執行緒池中
                        workers.add(w);
                        int s = workers.size();
                        // 更新largestPoolSize
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //執行緒新增執行緒池成功,則啟動新建立的執行緒
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //若執行緒啟動失敗,做一些清理工作,例如從workers中移除新新增的worker並遞減wokerCount
            if (! workerStarted)
                addWorkerFailed(w);
        }
        //返回執行緒是否啟動成功
        return workerStarted;
    }

因為程式碼(1)處的邏輯不利於理解,我們通過(1)的等價實現來理解:

if (rs>=SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
//等價實現
rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())

其含義為,滿足下列條件之一則直接返回false,執行緒建立失敗:

  • rs > SHUTDOWN,也就是STOP,TIDYING或TERMINATED,此時不再接受新的任務,且中斷正在執行的任務
  • rs = SHUTDOWN且firstTask != null,此時不再接受任務,但是仍會處理任務快取佇列中的任務
  • rs = SHUTDOWN,佇列為空

多說一句,若執行緒池處於 SHUTDOWN, firstTask 為 null,且 workQueue 非空,那麼還得建立執行緒繼續處理任務快取佇列中的任務。

總結一下,addWorker()方法完成了如下幾件任務:

  1. 原子性的增加workerCount
  2. 將使用者給定的任務封裝成為一個worker,並將此worker新增進workers集合中
  3. 啟動worker對應的執行緒
  4. 若執行緒啟動失敗,回滾worker的建立動作,即從workers中移除新新增的worker,並原子性的減少workerCount
3.工作執行緒的實現

從addWorker()方法的實現可以看出,工作執行緒的建立和啟動都跟ThreadPoolExecutor中的內部類Worker有關。下面我們分析Worker類來看一下工作執行緒的實現。

Worker類繼承自AQS類,具有鎖的功能;實現了Runable介面,可以將自身作為一個任務線上程中執行。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

Worker的主要欄位就下面三個,程式碼也比較簡單。

//用來封裝worker的執行緒,執行緒池中真正執行的執行緒,通過執行緒工廠建立而來
        final Thread thread;
        //worker所對應的第一個任務,可能為空
        Runnable firstTask;
        //記錄當前執行緒完成的任務數
        volatile long completedTasks;

Worker的建構函式如下。

Worker(Runnable firstTask) {
            //設定AQS的state為-1,在執行runWorker()方法之前阻止執行緒中斷
            setState(-1);
            //初始化第一個任務
            this.firstTask = firstTask;
            //利用指定的執行緒工廠建立一個執行緒,注意,引數是Worker例項本身this
            //也就是當執行start方法啟動執行緒thread時,真正執行的是Worker類的run方法
            this.thread = getThreadFactory().newThread(this);
        }

Worker類繼承了AQS類,重寫了其相應的方法,實現了一個自定義的同步器,實現了不可重入鎖。

//是否持有獨佔鎖
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        //嘗試獲取鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                //設定獨佔執行緒
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //嘗試釋放鎖
        protected boolean tryRelease(int unused) {
            //設定獨佔執行緒為null
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        //獲取鎖
        public void lock()        { acquire(1); }
        //嘗試獲取鎖
        public boolean tryLock()  { return tryAcquire(1); }
        //釋放鎖
        public void unlock()      { release(1); }
        //是否持有鎖
        public boolean isLocked() { return isHeldExclusively(); }

Worker類還提供了一箇中斷執行緒thread的方法。

void interruptIfStarted() {
            Thread t;
            //AQS狀態大於等於0,worker對應的執行緒不為null,且該執行緒沒有被中斷
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

再來看一下Worker類的run()方法的實現,會發現run()方法最終呼叫了ThreadPoolExecutor類的runWorker()方法。

public void run() {
            runWorker(this);
        }
4.執行緒複用機制

通過上文可以知道,worker中的執行緒start 後,執行的是worker的run()方法,而run()方法最終會呼叫ThreadPoolExecutor類的runWorker()方法,runWorker()方法實現了執行緒池中的執行緒複用機制。下面我們來看一下runWorker()方法的實現。

final void runWorker(Worker w) {
        //獲取當前執行緒
        Thread wt = Thread.currentThread();
        //獲取w的firstTask
        Runnable task = w.firstTask;
        //設定w的firstTask為null
        w.firstTask = null;
        // 釋放鎖,設定AQS的state為0,允許中斷
        w.unlock();
        //用於標識執行緒是否異常終止,finally中processWorkerExit()方法會有不同邏輯
        boolean completedAbruptly = true;
        try {
            //迴圈呼叫getTask()獲取任務,不斷從任務快取佇列獲取任務並執行
            while (task != null || (task = getTask()) != null) {
                //進入迴圈內部,代表已經獲取到可執行的任務,則對worker物件加鎖,保證執行緒在執行任務過程中不會被中斷
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||  //若執行緒池狀態大於等於STOP,那麼意味著該執行緒要中斷
                     (Thread.interrupted() &&      //執行緒被中斷
                      runStateAtLeast(ctl.get(), STOP))) &&  //且是因為執行緒池內部狀態變化而被中斷
                    !wt.isInterrupted())           //確保該執行緒未被中斷
                    //發出中斷請求
                    wt.interrupt();
                try {
                    //開始執行任務前的Hook方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //到這裡正式開始執行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //執行任務後的Hook方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    //置空task,準備通過getTask()獲取下一個任務
                    task = null;
                    //completedTasks遞增
                    w.completedTasks++;
                    //釋放掉worker持有的獨佔鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //到這裡,執行緒執行結束,需要執行結束執行緒的一些清理工作
            //執行緒執行結束可能有兩種情況:
            //1.getTask()返回null,也就是說,這個worker的使命結束了,執行緒執行結束
            //2.任務執行過程中發生了異常
            //第一種情況,getTask()返回null,那麼getTask()中會將workerCount遞減
            //第二種情況,workerCount沒有進行處理,這個遞減操作會在processWorkerExit()中處理
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker()方法是執行緒池的核心,實現了執行緒池中的執行緒複用機制,來看一下

runWorker()方法都做了哪些工作:

1.執行第一個任務firstTask之後,迴圈呼叫getTask()方法獲取任務,不斷從任務快取佇列獲取任務並執行;

2.獲取到任務之後就對worker物件加鎖,保證執行緒在執行任務的過程中不會被中斷,任務執行完會釋放鎖;

3.在執行任務的前後,可以根據業務場景重寫beforeExecute()和afterExecute()等Hook方法;

4.執行通過getTask()方法獲取到的任務

5.執行緒執行結束後,呼叫processWorkerExit()方法執行結束執行緒的一些清理工作

從runWorker()方法的實現可以看出,runWorker()方法中主要呼叫了getTask()方法和processWorkerExit()方法,下面分別看一下這兩個方法的實現。

getTask()的實現

getTask()方法用來不斷地從任務快取佇列獲取任務並交給執行緒執行,下面分析一下其實現。

private Runnable getTask() {
        //標識當前執行緒是否超時未能獲取到task物件
        boolean timedOut = false;

        for (;;) {
            //獲取執行緒池的控制狀態
            int c = ctl.get();
            //獲取執行緒池的執行狀態
            int rs = runStateOf(c);

            //如果執行緒池狀態大於等於STOP,或者處於SHUTDOWN狀態,並且阻塞佇列為空,執行緒池工作執行緒數量遞減,方法返回null,回收執行緒
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            
            //獲取worker數量
            int wc = workerCountOf(c);

            //標識當前執行緒在空閒時,是否應該超時回收
            // 如果allowCoreThreadTimeOut為ture,或當前執行緒數大於核心池大小,則需要超時回收
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            //如果worker數量大於maximumPoolSize(有可能呼叫了 setMaximumPoolSize(),導致worker數量大於maximumPoolSize)
            if ((wc > maximumPoolSize || (timed && timedOut))  //或者獲取任務超時
                && (wc > 1 || workQueue.isEmpty())) {  //workerCount大於1或者阻塞佇列為空(在阻塞佇列不為空時,需要保證至少有一個工作執行緒)
                if (compareAndDecrementWorkerCount(c))
                    //執行緒池工作執行緒數量遞減,方法返回null,回收執行緒
                    return null;
                //執行緒池工作執行緒數量遞減失敗,跳過剩餘部分,繼續迴圈
                continue;
            }

            try {
                //如果允許超時回收,則呼叫阻塞佇列的poll(),只在keepAliveTime時間內等待獲取任務,一旦超過則返回null
                //否則呼叫take(),如果佇列為空,執行緒進入阻塞狀態,無限時等待任務,直到佇列中有可取任務或者響應中斷訊號退出
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //若task不為null,則返回成功獲取的task物件
                if (r != null)
                    return r;
                // 若返回task為null,表示執行緒空閒時間超時,則設定timeOut為true
                timedOut = true;
            } catch (InterruptedException retry) {
                //如果此worker發生了中斷,採取的方案是重試,沒有超時
                //在哪些情況下會發生中斷?呼叫setMaximumPoolSize(),shutDown(),shutDownNow()
                timedOut = false;
            }
        }
    }

接下來總結一下getTask()方法會在哪些情況下返回:

1.執行緒池處於RUNNING狀態,阻塞佇列不為空,返回成功獲取的task物件

2.執行緒池處於SHUTDOWN狀態,阻塞佇列不為空,返回成功獲取的task物件

3.執行緒池狀態大於等於STOP,返回null,回收執行緒

4.執行緒池處於SHUTDOWN狀態,並且阻塞佇列為空,返回null,回收執行緒

5.worker數量大於maximumPoolSize,返回null,回收執行緒

6.執行緒空閒時間超時,返回null,回收執行緒

processWorkerExit()的實現

processWorkerExit()方法負責執行結束執行緒的一些清理工作,下面分析一下其實現。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //如果使用者任務執行過程中發生了異常,則需要遞減workerCount
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        //獲取全域性鎖
        mainLock.lock();
        try {
            //將worker完成任務的數量累加到總的完成任務數中
            completedTaskCount += w.completedTasks;
            //從workers集合中移除該worker
            workers.remove(w);
        } finally {
            //釋放鎖
            mainLock.unlock();
        }
        //嘗試終止執行緒池
        tryTerminate();
        //獲取執行緒池控制狀態
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {  //執行緒池執行狀態小於STOP
            if (!completedAbruptly) {  //如果使用者任務執行過程中發生了異常,則直接呼叫addWorker()方法建立執行緒
                //是否允許核心執行緒超時
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //允許核心超時並且workQueue阻塞佇列不為空,那執行緒池中至少有一個工作執行緒
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //如果工作執行緒數量workerCount大於等於核心池大小corePoolSize,
                //或者允許核心超時並且workQueue阻塞佇列不為空時,執行緒池中至少有一個工作執行緒,直接返回
                if (workerCountOf(c) >= min)
                    return;
                //若不滿足上述條件,則呼叫addWorker()方法建立執行緒
            }
            //建立新的執行緒取代當前執行緒
            addWorker(null, false);
        }
    }

processWorkerExit()方法中主要呼叫了tryTerminate()方法,下面看一下tryTerminate()方法的實現。

final void tryTerminate() {
        for (;;) {
            //獲取執行緒池控制狀態
            int c = ctl.get();
            if (isRunning(c) ||    //執行緒池的執行狀態為RUNNING
                runStateAtLeast(c, TIDYING) ||    //執行緒池的執行狀態大於等於TIDYING
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))  //執行緒池的執行狀態為SHUTDOWN且阻塞佇列不為空
                //不能終止,直接返回
                return;

            //只有當執行緒池的執行狀態為STOP,或執行緒池執行狀態為SHUTDOWN且阻塞佇列為空時,可以執行到這裡
            //如果執行緒池工作執行緒的數量不為0
            if (workerCountOf(c) != 0) {
                //僅僅中斷一個空閒的worker
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            //只有當執行緒池工作執行緒的數量為0時可以執行到這裡
            final ReentrantLock mainLock = this.mainLock;
            //獲取全域性鎖
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {  //CAS操作設定執行緒池執行狀態為TIDYING,工作執行緒數量為0
                    try {
                        //執行terminated()鉤子方法
                        terminated();
                    } finally {
                        //設定執行緒池執行狀態為TERMINATED,工作執行緒數量為0
                        ctl.set(ctlOf(TERMINATED, 0));
                        //喚醒在termination條件上等待的所有執行緒
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                //釋放鎖
                mainLock.unlock();
            }
            //若CAS操作失敗則重試
        }
    }

tryTerminate()方法的作用是嘗試終止執行緒池,它會在所有可能終止執行緒池的地方被呼叫,滿足終止執行緒池的條件有兩個:首先,執行緒池狀態為STOP,或者為SHUTDOWN且任務快取佇列為空;其次,工作執行緒數量為0。

滿足了上述兩個條件之後,tryTerminate()方法獲取全域性鎖,設定執行緒池執行狀態為TIDYING,之後執行terminated()鉤子方法,最後設定執行緒池狀態為TERMINATED。

至此,執行緒池執行狀態變為TERMINATED,工作執行緒數量為0,workers已清空,且workQueue也已清空,所有執行緒都執行結束,執行緒池的生命週期到此結束。

5.關閉執行緒池

關閉執行緒池有兩個方法,shutdown()和shutdownNow(),下面分別看一下這兩個方法的實現。

shutdown()的實現

shutdown()方法將執行緒池執行狀態設定為SHUTDOWN,此時執行緒池不會接受新的任務,但會處理阻塞佇列中的任務。

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        //獲取全域性鎖
        mainLock.lock();
        try {
            //檢查shutdown許可權
            checkShutdownAccess();
            //設定執行緒池執行狀態為SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中斷所有空閒worker
            interruptIdleWorkers();
            //用onShutdown()鉤子方法
            onShutdown();
        } finally {
            //釋放鎖
            mainLock.unlock();
        }
        //嘗試終止執行緒池
        tryTerminate();
    }

shutdown()方法首先會檢查是否具有shutdown的許可權,然後設定執行緒池的執行狀態為SHUTDOWN,之後中斷所有空閒的worker,再呼叫onShutdown()鉤子方法,最後嘗試終止執行緒池。

shutdown()方法呼叫了interruptIdleWorkers()方法中斷所有空閒的worker,其實現如下。

private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    //onlyOne標識是否只中斷一個執行緒
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        //獲取全域性鎖
        mainLock.lock();
        try {
            //遍歷workers集合
            for (Worker w : workers) {
                //worker對應的執行緒
                Thread t = w.thread;
                //執行緒未被中斷且成功獲得鎖
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        //發出中斷請求
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        //釋放鎖
                        w.unlock();
                    }
                }
                //若只中斷一個執行緒,則跳出迴圈
                if (onlyOne)
                    break;
            }
        } finally {
            //釋放鎖
            mainLock.unlock();
        }
    }
shutdownNow()的實現

shutdownNow()方法將執行緒池執行狀態設定為STOP,此時執行緒池不會接受新任務,也不會處理阻塞佇列中的任務,並且中斷正在執行的任務。

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        //獲取全域性鎖
        mainLock.lock();
        try {
            //檢查shutdown許可權
            checkShutdownAccess();
            //設定執行緒池執行狀態為STOP
            advanceRunState(STOP);
            //中斷所有worker
            interruptWorkers();
            //將任務快取佇列中等待執行的任務取出並放到list中
            tasks = drainQueue();
        } finally {
            //釋放鎖
            mainLock.unlock();
        }
        //嘗試終止執行緒池
        tryTerminate();
        //返回任務快取佇列中等待執行的任務列表
        return tasks;
    }

shutdownNow()方法與shutdown()方法相似,不同之處在於,前者設定執行緒池的執行狀態為STOP,之後中斷所有的worker(並非只是空閒的worker),嘗試終止執行緒池之後,返回任務快取佇列中等待執行的任務列表。

shutdownNow()方法呼叫了interruptWorkers()方法中斷所有的worker(並非只是空閒的worker),其實現如下。

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        //獲取全域性鎖
        mainLock.lock();
        try {
            //遍歷workers集合
            for (Worker w : workers)
                //呼叫Worker類的interruptIfStarted()方法中斷執行緒
                w.interruptIfStarted();
        } finally {
            //釋放鎖
            mainLock.unlock();
        }
    }

五、總結

至此,我們已經閱讀了執行緒池框架的核心類ThreadPoolExecutor類的大部分原始碼,由衷地讚歎這個類很多地方設計的巧妙之處:

  • 將執行緒池的執行狀態和工作執行緒數量打包在一起,並使用了大量的位運算
  • 使用CAS操作更新執行緒控制狀態ctl,確保對ctl的更新是原子操作
  • 內部類Worker類繼承了AQS,實現了一個自定義的同步器,實現了不可重入鎖
  • 使用while迴圈自旋地從任務快取佇列中獲取任務並執行,實現了執行緒複用機制
  • 呼叫interrupt()方法中斷執行緒,但注意該方法並不能直接中斷執行緒的執行,只是發出了中斷訊號,配合BlockingQueue的take(),poll()方法的使用,打斷執行緒的阻塞狀態

其實,執行緒池的本質就是生產者消費者模式,執行緒池的呼叫者不斷向執行緒池提交任務,執行緒池裡面的工作執行緒不斷獲取這些任務並執行(從任務快取佇列獲取任務或者直接執行任務)。