1. 程式人生 > 實用技巧 >JUC併發程式設計

JUC併發程式設計

1. 什麼是JUC

JUC 指的是 java.util .concurrent 工具包。這是一個處理執行緒的工具包,在此包中增加了在併發程式設計中很常用的工具類,用於定義類似於執行緒的自定義子系統、包括執行緒池、非同步 IO 和輕量級任務框架,還提供了設計用於多執行緒上下文中的 Collection 實現等;

JUC 是在JDK 1.5 開始出現的。下面一起來看看它怎麼使用。

2. 執行緒和程序

2-1. 程序正在進行中的程式(直譯),一個程序至少包含一個或多個執行緒。

2-2. 執行緒:程序中一個負責程式執行的控制單元(執行路徑),每一個執行緒都有自己執行的內容,這個內容可以稱之為執行緒要執行的任務;

1、Java預設有幾個執行緒?

答:2個,main主執行緒、GC執行緒

2、Java 真的可以開啟執行緒嗎?

答:不可以,只能通過本地方法呼叫底層的C++去操作

public synchronized void start() {

    if (threadStatus != 0)
        throw new IllegalThreadStateException();

    group.add(this);

    boolean started = false;
    try {
        start0();
        started = true;
    } 
finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { } } } // 本地方法,底層的C++ ,Java 無法直接操作硬體 private native void start0();
2-3. 併發、並行 併發程式設計:併發並行 併發(多執行緒操作同一個資源)
  • CPU一核,模擬出多條執行緒,快速交替
並行(多個人一起行走)
  • CPU多核,多個執行緒可以同時執行
// 獲取CPU核數
// CPU密集型、IO密集型
Runtime.getRuntime().availableProcessors();

併發程式設計的本質:充分利用CPU的資源

2-4. 執行緒的幾個狀態
public enum State {

    // 新生
    NEW,

    // 執行
    RUNNABLE,

    // 阻塞
    BLOCKED,

    // 等待(死等,也是阻塞的等待)
    WAITING,

    // 超時等待(指定的時間內等待,過時不候)
    TIMED_WAITING,

    // 終止
    TERMINATED;
}
2-5. sleep /wait區別
① 來自不同的類:
  - sleep來自Thread類;
  - wait來自Thread類; ② 是否釋放鎖:   - sleep方法不會釋放鎖;
  - wait會方法釋放鎖,使得其他執行緒可以使用同步控制塊或者方法(鎖程式碼塊和方法鎖); ③ 使用範圍:   - sleep可以在任何地方使用;
  - wait,notify和notifyAll只能在同步控制方法或者同步控制塊裡面使用; ④ 異常捕獲:   - sleep必須捕獲異常;
  - wait,notify和notifyAll不需要捕獲異常;

3. Lock鎖同步

在JDK1.5之前,解決多執行緒安全問題有兩種方式:sychronized 隱式鎖

  • 同步程式碼塊、同步方法

在JDK1.5之後,出現了更加靈活的方式:Lock 顯式鎖

  • 同步鎖

3-1. Lock需要通過lock()方法上鎖,通過unlock()方法釋放鎖。為了保證鎖能釋放,所有unlock方法一般放在finally中去執行。

Lock l = ...; l.lock(); try { // access the resource protected by this lock } finally { l.unlock(); }

○ Lock實現類:

○ReentrantLock:預設為非公平鎖

公平鎖:十分公平:可以先來後到

非公平鎖:十分不公平:可以插隊 (預設)

3-2. 再來看一下賣票案例: ● 傳統的Synchronized
public class SaleTicketDemo01 {
    public static void main(String[] args) {
        // 併發:多執行緒操作同一個類,把資源類丟入執行緒就可以了
        Ticket ticket = new Ticket();

        // @FunctionalInterface 函式式介面,jdk1.8 lambda表示式 (引數)->{ 程式碼 }
        new Thread(() -> { for (int i = 0; i < 30; i++) ticket.sale(); }, "001").start();
        new Thread(() -> { for (int i = 0; i < 30; i++) ticket.sale(); }, "002").start();
        new Thread(() -> { for (int i = 0; i < 30; i++) ticket.sale(); }, "003").start();
    }
}

// 資源類 OOP
class Ticket {

    private int ticket = 30;

    // synchronized 本質: 佇列,鎖
    public synchronized void sale() {
        if (ticket > 0) {
            System.out.println("視窗 " + Thread.currentThread().getName() + " 完成售票,餘票為:" + (--ticket));
        }
    }
}

多個執行緒同時操作共享資料ticket,所以會出現執行緒安全問題。會出現同一張票賣了好幾次或者票數為負數的情況。以前用同步程式碼塊和同步方法解決,現在看看用同步鎖怎麼解決。

● Lock介面:直接建立lock物件,然後用lock()方法上鎖,最後用unlock()方法釋放鎖即可。

public class SaleTicketDemo02 {
    public static void main(String[] args) {
        // 併發:多執行緒操作同一個類,把資源類丟入執行緒就可以了
        Ticket2 ticket2 = new Ticket2();

        new Thread(() -> { for (int i = 0; i < 30; i++) ticket2.sale(); }, "001").start();
        new Thread(() -> { for (int i = 0; i < 30; i++) ticket2.sale(); }, "002").start();
        new Thread(() -> { for (int i = 0; i < 30; i++) ticket2.sale(); }, "003").start();
    }
}
// 資源類
class Ticket2 {

    private int ticket = 30;

    private Lock lock = new ReentrantLock(); // 1.建立lock鎖

    public void sale() {
        lock.lock(); // 2.加鎖
        try {
            if (ticket > 0) {
                System.out.println("Lock視窗 " + Thread.currentThread().getName() + " 完成售票,餘票為:" + (--ticket));
            }
        } finally {
            lock.unlock(); // 3.解鎖
        }
    }
}

3-3.Synchronized 和 Lock 的區別

Synchronized 內建的 Java 關鍵字Lock 是一個 Java 類

Synchronized 無法判斷獲取鎖的狀態Lock 可以判斷是否獲取到了鎖

Synchronized 會自動釋放Lock 必須要手動釋放鎖!如果不釋放 會產生死鎖

Synchronized 執行緒 1(獲得鎖,阻塞),執行緒 2(等待獲取,死等)Lock 鎖就不一定會等待下去

Synchronized 可重入鎖,不可以中斷的,非公平Lock 可重入鎖,可以判斷鎖,非公平(可以自己設定)

Synchronized 適合鎖少量的程式碼同步問題Lock 適合鎖大量的同步程式碼!

4. 等待喚醒機制

4-1. 虛假喚醒問題

生產消費模式是等待喚醒機制的一個經典案例,看下面的程式碼:

public class TestProductorAndConsumer {
    public static void main(String[] args) {

        Data data = new Data();

        new Thread(() -> { for (int i = 0; i < 10; i++) data.decrement(); }, "生產者01").start();
        new Thread(() -> { for (int i = 0; i < 10; i++) data.increment(); }, "消費者02").start();
        new Thread(() -> { for (int i = 0; i < 10; i++) data.decrement(); }, "生產者03").start();
        new Thread(() -> { for (int i = 0; i < 10; i++) data.increment(); }, "消費者04").start();
    }
}

class Data {

    private int product = 0; // 共享資料

    // product +1
    public synchronized void increment() {
        while (product != 0) {
            try {
                this.wait(); // 等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        product++;
        System.out.println(Thread.currentThread().getName() + " => " + product);
        // 通知其他執行緒,我 +1 完畢了
        this.notifyAll();
    }

    // product -1
    public synchronized void decrement() {
        while (product == 0) {
            try {
                this.wait(); // 等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        product--;
        System.out.println(Thread.currentThread().getName() + " => " + product);
        // 通知其他執行緒,我 -1 完畢了
        this.notifyAll();
    }
}

● 問題存在:當存在 2 個以上的執行緒時,可能會產生虛假喚醒

● 解決辦法:將 if 改為 while 判斷

// product +1
public synchronized void increment() throws InterruptedException {
    while (product != 0)
        this.wait(); // 等待
    product++;
    System.out.println(Thread.currentThread().getName() + " => " + product);
    // 通知其他執行緒,我 +1 完畢了
    this.notifyAll();
}

// product -1
public synchronized void decrement() throws InterruptedException {
    while (product == 0)
        this.wait(); // 等待
    product--;
    System.out.println(Thread.currentThread().getName() + " => " + product);
    // 通知其他執行緒,我 -1 完畢了
    this.notifyAll();
}

只需要把 if 改成 while,每次都再去判斷一下,就可以了。

4-2. 用Lock鎖實現等待喚醒

public class TestProductorAndConsumer2 {
    public static void main(String[] args) {

        Data2 data = new Data2();
        
        new Thread(() -> { for (int i = 0; i < 10; i++) data.decrement(); }, "生產者01").start();
        new Thread(() -> { for (int i = 0; i < 10; i++) data.increment(); }, "消費者02").start();
        new Thread(() -> { for (int i = 0; i < 10; i++) data.decrement(); }, "生產者03").start();
        new Thread(() -> { for (int i = 0; i < 10; i++) data.increment(); }, "消費者04").start();
    }
}

class Data2 {

    private int product = 0; // 共享資料

    private Lock lock = new ReentrantLock(); //  建立鎖物件
    Condition condition = lock.newCondition(); // 獲取鎖的監視器物件

    // product +1
    public void increment() {
        lock.lock(); // 加鎖
        try {
            while (product != 0)
                condition.await(); // 等待
            product++;
            System.out.println("Lock" + Thread.currentThread().getName() + " => " + product);
            condition.signalAll();// 通知其他執行緒,我 +1 完畢了
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }

    // product -1
    public void decrement() {
        lock.lock(); // 加鎖
        try {
            while (product == 0)
                condition.await(); // 等待
            product--;
            System.out.println("Lock" + Thread.currentThread().getName() + " => " + product);
            // 通知其他執行緒,我 -1 完畢了
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }
}

使用lock同步鎖,就不需要sychronized關鍵字了,需要建立lock物件和condition例項。condition的await()方法、signal()方法和signalAll()方法分別與wait()方法、notify()方法和notifyAll()方法對應。

4-3. 執行緒按序交替

首先來看一道題:

編寫一個程式,開啟 3 個執行緒,這三個執行緒的 ID 分別為 A、B、C,
每個執行緒將自己的 ID 在螢幕上列印 10 遍,要求輸出的結果必須按順序顯示。
如:ABCABCABC…… 依次遞迴
分析:
執行緒本來是搶佔式進行的,要按序交替,所以必須實現執行緒通訊,
那就要用到等待喚醒。可以使用同步方法,也可以用同步鎖。

程式碼實現:

public class TestLoopPrint {
    public static void main(String[] args) {
        AlternationDemo alternationDemo = new AlternationDemo();

        new Thread(()->{for (int i = 0; i < 10; i++) alternationDemo.loopA(); },"執行緒A").start();
        new Thread(()->{for (int i = 0; i < 10; i++){alternationDemo.loopB();}},"執行緒B").start();
        new Thread(()->{for (int i = 0; i < 10; i++){alternationDemo.loopC();}},"執行緒C").start();
    }
}

class AlternationDemo {

    // 當前正在執行的執行緒的標記
    private int number = 1;
    // 建立鎖物件,並且使用該鎖建立三個監視器
    private Lock lock = new ReentrantLock();
    Condition condition_1 = lock.newCondition();
    Condition condition_2 = lock.newCondition();
    Condition condition_3 = lock.newCondition();

    public void loopA() {
        lock.lock();
        try {
            while (number != 1) { // 判斷
                condition_1.await(); // 等待
            }
            System.out.println(Thread.currentThread().getName() + " => AAAAA");
            // 喚醒指定的監視器2
            number = 2;
            condition_2.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void loopB() {
        lock.lock();
        try {
            while (number != 2) { // 判斷
                condition_2.await(); // 等待
            }
            System.out.println(Thread.currentThread().getName() + " => BBBBBB");
            // 喚醒指定的監視器3
            number = 3;
            condition_3.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void loopC() {
        lock.lock();
        try {
            while (number != 3) { // 判斷
                condition_3.await(); // 等待
            }
            System.out.println(Thread.currentThread().getName() + " => CCCCCCC");
            // 喚醒指定的監視器1
            number = 1;
            condition_1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
以上編碼就滿足需求。建立三個執行緒,分別呼叫loopA、loopB和loopC方法,這三個執行緒使用condition進行通訊。

5. 鎖的8種問題

1)標準情況

按照普通的情況訪問同步方法,檢視輸出

public class Test1 {
    public static void main(String[] args) {

        Phone1 phone = new Phone1();

        new Thread(() -> {phone.sendSms(); }, "A").start();

        try { // 睡眠2秒
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> { phone.call(); }, "B").start();
    }
}

class Phone1 {

    public synchronized void sendSms()  {
        System.out.println("發簡訊》》》");
    }

    public synchronized void call() {
        System.out.println("打電話!!!");
    }
}
View Code

- 執行結果:1、發簡訊 2、打電話

2)在其中一種方法中新增sleep方法訪問

在sendSms方法中新增sleep,檢視修改後的列印順序

class Phone1 {

    public synchronized void sendSms()  {
        try {
            TimeUnit.SECONDS.sleep(4); // 睡4秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("發簡訊》》》");
    }

    public synchronized void call() {
        System.out.println("打電話!!!");
    }
}
View Code

- 執行結果:1、發簡訊 2、打電話

synchronized 鎖的是物件this,兩個方法用的使用一個鎖,所以誰先拿到誰先執行

3)新增一個未加鎖方法,檢視訪問結果

在Phone類中新增hello方法,同時執行緒修改為訪問同步方法和普通方法

public class Test2 {
    public static void main(String[] args) {

        Phone2 phone = new Phone2();

        // 有鎖
        new Thread(() -> { phone.sendSms(); }, "A").start();

        try { // 睡眠2秒
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 沒鎖
        new Thread(() -> { phone.hello(); }, "B").start();
    }
}

class Phone2 {

    public synchronized void sendSms()  {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("發簡訊》》》");
    }

    public synchronized void call() { System.out.println("打電話!!!"); }

    // 這裡沒有鎖,不是同步方法,不受鎖的影響
    public void hello() {
        System.out.println("hello!");
    }
}
View Code

- 執行結果:1、hello 2、發簡訊

4)執行時建立兩個不同物件,通過不同物件訪問加鎖的方法

在建立執行緒時,通過不同物件執行同步方法,檢視執行結果

public class Test2 {
    public static void main(String[] args) {

        // 兩個物件,兩個呼叫者,兩把鎖!
        Phone2 phone1 = new Phone2();
        Phone2 phone2 = new Phone2();

        // 有鎖
        new Thread(() -> { phone1.sendSms(); }, "A").start();

        try { // 睡眠2秒
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 有鎖
        new Thread(() -> { phone2.call(); }, "B").start();
    }
}

class Phone2 {

    // synchronized 是 物件鎖this
    public synchronized void sendSms()  {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("發簡訊》》》");
    }

    public synchronized void call() { System.out.println("打電話!!!"); }

    // 這裡沒有鎖
    public void hello() { System.out.println("hello!"); }
}
View Code

- 執行結果:1、打電話 2、發簡訊

synchronized 鎖的是物件this。兩個物件,兩個呼叫者,兩把鎖!所以互不受影響

5)將加鎖的方法改為靜態方法,同一個物件執行

兩個同步方法都改為靜態方法,通過同一個物件執行方法,檢視執行結果

public class Test3 {
    public static void main(String[] args) {

        Phone3 phone = new Phone3();

        new Thread(() -> { phone.sendSms(); }, "A").start();

        try { // 睡眠2秒
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> { phone.call(); }, "B").start();
    }
}

class Phone3 {

    // static synchronized 靜態方法鎖
    public static synchronized void sendSms() {
        try {
            TimeUnit.SECONDS.sleep(4); // 睡眠4秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("發簡訊》》》");
    }

    public static synchronized void call() { System.out.println("打電話!!!"); }

}
View Code

- 執行結果:1、發簡訊 2、打電話

static synchronized靜態同步方法,類一載入就有了,所以鎖的是Class

6)通過兩個物件訪問靜態同步方法

public class Test3 {
    public static void main(String[] args) {

        // 兩個物件的Class類模板只有一個,static,鎖的是Class
        Phone3 phone1 = new Phone3();
        Phone3 phone2 = new Phone3();

        new Thread(() -> { phone1.sendSms(); }, "A").start();

        try { // 睡眠2秒
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> { phone2.call(); }, "B").start();
    }
}

class Phone3 {

    // static synchronized 靜態方法,類一載入就有了,所以鎖的是Class
    public static synchronized void sendSms() {
        try {
            TimeUnit.SECONDS.sleep(4); // 睡眠4秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("發簡訊》》》");
    }

    public static synchronized void call() {
        System.out.println("打電話!!!");
    }

}
View Code

- 執行結果:1、打電話 2、發簡訊

兩個物件的Class類模板只有一個,鎖的是Class,只有一把鎖

7)一個靜態同步方法 一個普通同步方法,通過同一個物件執行

其中的一個方法去掉靜態關鍵字,檢視執行結果

public class Test4 {
    public static void main(String[] args) {

        Phone4 phone = new Phone4();

        new Thread(() -> { phone.sendSms(); }, "A").start();

        try { // 睡眠2秒
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> { phone.call(); }, "B").start();
    }
}

class Phone4 {

    // 靜態同步方法 鎖Class
    public static synchronized void sendSms() {
        try { // 睡眠4秒
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("發簡訊》》》");
    }

    // 普通同步方法 鎖this
    public synchronized void call() { System.out.println("打電話!!!"); }

}
View Code

- 執行結果:1、打電話 2、發簡訊

靜態同步鎖 Class,普通同步鎖 this,兩把不同的鎖,所以互不受影響

8)一個靜態同步方法一個普通同步方法,兩個物件執行

public class Test4 {
    public static void main(String[] args) {

        Phone4 phone1 = new Phone4();
        Phone4 phone2 = new Phone4();

        new Thread(() -> { phone1.sendSms(); }, "A").start();

        try { // 睡眠2秒
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> { phone2.call(); }, "B").start();
    }
}

class Phone4 {

    // 靜態同步方法 鎖Class
    public static synchronized void sendSms() {
        try { // 睡眠4秒
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("發簡訊》》》");
    }

    // 普通同步方法 鎖this
    public synchronized void call() {
        System.out.println("打電話!!!");
    }

}
View Code

- 執行結果:1、打電話 2、發簡訊

兩個物件 兩把不同的鎖,一個Class 一個this

● 總結:

① 物件鎖:使用 synchronized 修飾非靜態的方法以及 synchronized(this) 同步程式碼塊使用的鎖是物件鎖。

② 類鎖:使用 synchronized 修飾靜態的方法以及 synchronized(class) 同步程式碼塊使用的鎖是類鎖。

③ 私有鎖在類內部宣告一個私有屬性如 private Object lock,在需要加鎖的同步塊使用 synchronized(lock)

它們的特性:

  • 物件鎖具有可重入性。
  • 當一個執行緒獲得了某個物件的物件鎖,則該執行緒仍然可以呼叫其他任何需要該物件鎖的 synchronized 方法或 synchronized(this) 同步程式碼塊。
  • 當一個執行緒訪問某個物件的一個 synchronized(this) 同步程式碼塊時,其他執行緒對該物件中所有其它 synchronized(this) 同步程式碼塊的訪問將被阻塞,因為訪問的是同一個物件鎖。
  • 每個類只有一個類鎖,但是類可以例項化成物件,因此每一個物件對應一個物件鎖。
  • 普通方法與同步方鎖法無關。
  • 類鎖和物件鎖不會產生競爭。
  • 私有鎖和物件鎖也不會產生競爭。
  • 使用私有鎖可以減小鎖的細粒度,減少由鎖產生的開銷。

6. 集合類不安全

多個執行緒共同操作一個執行緒不安全的類時報併發修改異常

6-1. List - 執行緒不安全

public class ListTest {
    public static void main(String[] args) {

//        List<String> list = new Vector<>();
//        List<String> list = Collections.synchronizedList(new ArrayList<>());
        List<String> list = new CopyOnWriteArrayList<>();

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(Thread.currentThread().getName() + " - " + list);
            }, String.valueOf(i)).start();
        }
    }
}
● 解決方案:

① 使用 Vector 物件,實際上是加了 Synchronized 同步,實現同步需要很高的花費,效率較低

② 使用 Collections 集合工具類,給執行緒不安全的集合新增一層封裝

③ 寫入時複製 CopyOnWriteArrayList,加了寫鎖的集合,鎖住的是整個物件,但讀操作可以併發執行

  - 寫入時複製:這種集合將資料放在固定的陣列中,任何資料的改變,都會重新建立一個新的陣列來記錄值。

  - 這種集合被設計用在,讀多寫少的時候推薦使用!

6-2. Set - 執行緒不安全

public class SetTest {
    public static void main(String[] args) {

//        Set<String> set = Collections.synchronizedSet(new HashSet<>());
        Set<String> set = new CopyOnWriteArraySet<>();

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                set.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(Thread.currentThread().getName() + " - " + set);
            }, String.valueOf(i)).start();
        }
    }
}

● 解決方案:

① 使用 Collections 集合工具類,給執行緒不安全的集合新增一層封裝

② CopyOnWriteArraySet 寫入時複製,讀多寫少的時候推薦使用

● HashSet 底層:

// HashSet是基於HashMap實現的
public HashSet() {
    map = new HashMap<>();
}

// add set 本質就是 map key 是無法重複的!
public boolean add(E e) {
    return map.put(e, PRESENT)==null;
}

// PRESENT 是用來填充 map 中的 value,定義為 Object 型別。
private static final Object PRESENT = new Object();

6-3. Map - 執行緒不安全

public class MapTest {

    public static void main(String[] args) {

//        Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
        Map<String, String> map = new ConcurrentHashMap<>();

        for (int i = 0; i < 30; i++) {
            int finalI = i;
            new Thread(() -> {
                map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
                System.out.println(Thread.currentThread().getName() + " - " + map);
            }, String.valueOf(i)).start();
        }
    }
}

解決方案:

① 使用 Collections 集合工具類,給執行緒不安全的集合新增一層封裝

② 使用執行緒安全的 hash 表 ConcurrentHashMap,注意:1.7前採用是鎖分段機制,1.8後採用了CAS演算法

1、Map 是這樣用的嗎?
  不是,工作中不使用 HashMap
2、預設等價於什麼?
  new HashMap<>(16, 0.75f)
  - 初始容量:static final int DEFAULT_INITIAL_CAPACITY = 1 << 4
  - 載入因子:static final float DEFAULT_LOAD_FACTOR = 0.75f

7. 建立執行緒的方式 --- 實現Callable介面

● Callable:類似於Runnable

- call 方法有返回值,並且能夠丟擲異常。

- 有快取,結果可能需要等待,會阻塞!

● FutureTask:獲取 Callable 任務的返回值

使用 Future 我們可以得知 Callable 的執行狀態,以及獲取 Callable 執行完後的返回值。

Future 的方法介紹:

  • get() :阻塞式,用於獲取 Callable/Runnable 執行完後的返回值。
  •     帶時間引數的get()過載方法用於最多等待的時間後,如仍未返回則執行緒將繼續執行。
  • cancel() :撤銷正在執行 Callable 的 Task。
  • isDone():是否執行完畢。
  • isCancelled():任務是否已經被取消。

● 使用例項:

public class CallableTest {
    public static void main(String[] args) {

        CallableDemo callableDemo = new CallableDemo();
        for (int i = 0; i < 10; i++) {
            // 執行callable方式,需要FutureTask實現類的支援,用來接收運算結果
            FutureTask<Integer> task = new FutureTask<>(callableDemo);
            // 執行執行緒任務
            new Thread(task, String.valueOf(i)).start();
            // 接收執行緒運算結果
            try {
                // 需要等到執行緒執行完呼叫get方法才會執行,也可以用於閉鎖操作
                System.out.println(task.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

class CallableDemo implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println(Thread.currentThread().getName() + " => call()");
        TimeUnit.SECONDS.sleep(1); // 睡眠1秒
        return new Random().nextInt(100);
    }
}

Callable介面和實現Runable介面的區別就是,Callable帶泛型,其call方法有返回值。使用的時候,需要用FutureTask來接收返回值。而且它也要等到執行緒執行完呼叫get方法才會執行,也可以用於閉鎖操作。

8. 讀寫鎖 -ReadWriterLock

我們在讀資料的時候,可以多個執行緒同時讀,不會出現問題,但是寫資料的時候,如果多個執行緒同時寫資料,那麼到底是寫入哪個執行緒的資料呢?

所以,如果有兩個執行緒,讀-寫寫-寫 需要互斥,讀-讀 不需要互斥。這個時候可以用讀寫鎖。

獨佔鎖(寫鎖):一次只能被一個執行緒佔有

共享鎖(讀鎖):多個執行緒可以同時佔有

程式碼實現

public class ReadWriteLockDemo {

    public static void main(String[] args) {
        RWLTest rwlTest = new RWLTest();

        // 寫入
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            new Thread(() -> {
                rwlTest.set(String.valueOf(finalI), finalI + "");
            }, String.valueOf(i)).start();
        }

        // 讀取
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            new Thread(() -> {
                rwlTest.get(String.valueOf(finalI));
            }, String.valueOf(i)).start();
        }
    }
}

class RWLTest {

    private volatile Map<String, Object> map = new HashMap<>();
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    // 讀(可以多個執行緒同時操作)
    public void get(String key) {
        readWriteLock.readLock().lock();// 上鎖
        try {
            System.out.println(Thread.currentThread().getName() + " -> 讀取 key[" + key + "]");
            Object obj = map.get(key);
            System.out.println(Thread.currentThread().getName() + " -> 獲得 value[" + obj + "]");
        } finally {
            readWriteLock.readLock().unlock();// 釋放鎖
        }
    }

    // 寫(一次只能有一個執行緒操作)
    public void set(String key, Object value) {
        readWriteLock.writeLock().lock();// 上鎖
        try {
            System.out.println(Thread.currentThread().getName() + " -> 寫入 key[" + key + "]");
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + " -> 寫入成功!!!");
        } finally {
            readWriteLock.writeLock().unlock();// 釋放鎖
        }
    }
}

執行結果:

9. 阻塞佇列 -BlockingQueue

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞的插入和移除方法。

① 支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿。

② 支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。

9-1. 阻塞佇列的四種方法

方法\處理方式丟擲異常有返回值,不丟擲異常阻塞等待超時等待
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查隊首方法 element() peek() 不可用 不可用

● 丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。

● 有返回值,不丟擲異常:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null

● 阻塞等待:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。

● 超時等待:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出等待。

程式碼演示:

public class BlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {

//        test01();
//        test02();
//        test03();
        test04();
    }

    /* 丟擲異常 */
    public static void test01() {
        // 佇列的大小
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue(3);

        System.out.println(queue.add("a"));
        System.out.println(queue.add("b"));
        System.out.println(queue.add("c"));
//        System.out.println(queue.add("d")); // 佇列已滿,丟擲異常 IllegalStateException

        System.out.println("隊首 = " + queue.element()); // 檢測隊首元素

        System.out.println("==================");
        System.out.println(queue.remove());
        System.out.println(queue.remove());
        System.out.println(queue.remove());
//        System.out.println(queue.remove()); // 佇列置空,沒有元素,丟擲異常 NoSuchElementException
    }

    /* 有返回值,不丟擲異常 */
    private static void test02() {

        ArrayBlockingQueue queue = new ArrayBlockingQueue(3);

        System.out.println(queue.offer("111"));
        System.out.println(queue.offer("222"));
        System.out.println(queue.offer("333"));
        System.out.println(queue.offer("444")); // false,不丟擲異常

        System.out.println("隊首 = " + queue.peek()); // 檢測隊首元素

        System.out.println("======================================");

        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll()); // null,不丟擲異常

    }

    /* 等待,阻塞(一直阻塞) */
    private static void test03() throws InterruptedException {

        ArrayBlockingQueue queue = new ArrayBlockingQueue(3);

        // 一直阻塞
        queue.put("AAA");
        queue.put("BBB");
        queue.put("CCC");
//        queue.put("DDD"); // 佇列已滿,等待新增 -> 阻塞

        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
//        System.out.println(queue.take()); // 佇列為空,等待獲取 -> 阻塞
    }

    /* 有返回值,不丟擲異常,等待,阻塞(等待超時) */
    private static void test04() throws InterruptedException {

        ArrayBlockingQueue queue = new ArrayBlockingQueue(3);

        System.out.println(queue.offer("A1", 2, TimeUnit.SECONDS));
        System.out.println(queue.offer("B2", 2, TimeUnit.SECONDS));
        System.out.println(queue.offer("C3", 2, TimeUnit.SECONDS));
        System.out.println(queue.offer("D4", 2, TimeUnit.SECONDS)); // 等待超過兩秒,就返回false

        System.out.println("================================");

        System.out.println(queue.poll(2, TimeUnit.SECONDS));
        System.out.println(queue.poll(2, TimeUnit.SECONDS));
        System.out.println(queue.poll(2, TimeUnit.SECONDS));
        System.out.println(queue.poll(2, TimeUnit.SECONDS)); // 等待超過兩秒,就返回null
    }
}
BlockingQueueTest

9-2. Java裡的阻塞佇列

JDK7提供了7個阻塞佇列。分別是:

ArrayBlockingQueue:是用陣列實現的有界阻塞佇列,初始化時必須傳入容量,是FIFO佇列,支援公平和非公平鎖。

LinkedBlockingQueue:是用連結串列實現的有界阻塞佇列,初始化時如果沒有傳入容量,則容量時Intger.MAX_VALUE,是FIFO佇列,因為入隊和出隊方法各是一把鎖,所以一般情況下併發效能優於ArrayBlockingQueue。

LinkedBlockingDeque:是雙向連結串列實現的有界阻塞佇列,初始化時如果沒有傳入容量,則容量時Intger.MAX_VALUE,可以實現FIFO佇列,也可以實現LIFO佇列。

PriorityBlockingQueue:是陣列實現的具有優先順序的無界阻塞佇列,有兩個注意事項,

  1. 該佇列是具有優先順序的,不是按先進先出或者後進先出,是按優先順序的高低,所以入隊的物件必須是實現了Comparable介面的。
  2. 佇列雖然邏輯上是無界的,但因為是陣列實現的,不能無限擴容,作者在程式碼裡限制了,最大容量是Intger.MAX_VALUE-8。

DelayQueue:延遲佇列可以指定多久才能從佇列中獲取當前元素。只有延時期滿後才能從佇列中獲取元素, 元素必須是Delayed的子類

SynchronousQueue:不儲存元素的阻塞佇列,每一個put操作必須等待take操作,否則不能新增元素。支援公平鎖和非公平鎖。

LinkedTransferQueue:由連結串列結構組成的無界阻塞佇列,對比LinkedBlockingQueue除了有put、take等方法外,還提供了transfer方法,該方法會阻塞執行緒,直到元素被消費,才返回。

上面列舉了阻塞佇列的異同點,我們可以根據自己的業務場景選擇合適的阻塞佇列。

簡單介紹下其中兩個佇列:

9-2-1. ArrayBlockingQueue

public interface BlockingQueue<E> extends Queue<E> {
    //入隊,入隊失敗會丟擲異常
    boolean add(E e);

    //入隊,入隊成功返回true,否則返回false
    boolean offer(E e);

    //入隊,入隊成功返回,否則進行等待
    void put(E e) throws InterruptedException;

    //入隊,不成功等待一段時間,仍然不能入隊成功返回false,入隊成功返回true
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    //出隊,佇列為空進行等待,否則將元素出隊
    E take() throws InterruptedException;

    //出隊,佇列為空進行一段時間的等待,仍然為空返回null,否則將元素出隊
    E poll(long timeout, TimeUnit unit) throws InterruptedException;

    //返回剩餘餘量
    int remainingCapacity();

    //將某個元素出隊,存在返回true,否則返回false
    boolean remove(Object o);

    //判斷是否包含某個元素
    public boolean contains(Object o);

    //將佇列中所有可用元素出隊到集合C中
    int drainTo(Collection<? super E> c);

    //將佇列中最多maxElements個可用元素出隊到集合C中
    int drainTo(Collection<? super E> c, int maxElements);
}
BlockingQueue

ArrayBlockingQueue是一個用陣列實現的有界阻塞佇列。此佇列按照先進先出(FIFO)的原則對元素進行排序。預設情況下不保證訪問者公平的訪問佇列。

什麼是公平的訪問佇列?

所謂公平訪問佇列是指阻塞的所有生產者執行緒或消費者執行緒,當佇列可用時,可以按照阻塞的先後順序訪問佇列,即先阻塞的生產者執行緒,可以先往佇列裡插入元素,先阻塞的消費者執行緒,可以先從佇列裡獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下程式碼建立一個公平的阻塞

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

訪問者的公平性是使用可重入鎖實現的,程式碼如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

9-2-2. SynchronousQueue

SynchronousQueue 是一個不儲存元素的阻塞佇列。每一個put操作必須等待一個take操作,否則不能繼續新增元素。

適用場景:執行緒之間資料傳遞,一個執行緒使用過的資料,傳給另外一個執行緒使用。

宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。

public SynchronousQueue() {
    this(false);
}

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

公平模式和非公平模式的區別:

- 公平模式:採用公平鎖,在獲取鎖時,增加了isFirst(current)判斷,當且僅當,等待佇列為空或當前執行緒是等待佇列的頭結點時,才可嘗試獲取鎖。

- 非公平模式(預設):採用非公平鎖,那些嘗試獲取鎖且尚未進入等待佇列的執行緒會和等待佇列head結點的執行緒發生競爭。

程式碼演示:

/**
 * 同步佇列 - SynchronousQueue
 * 和其它的BlockingQueue不一樣,SynchronousQueue不儲存元素
 * put一個元素,必須take取出一個元素出來,否則不能再put進去
 */
public class SynchronousQueueDemo {
    public static void main(String[] args) {

        SynchronousQueue queue = new SynchronousQueue();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " Put 111");
                queue.put("111");
                System.out.println(Thread.currentThread().getName() + " Put 222");
                queue.put("222");
                System.out.println(Thread.currentThread().getName() + " Put 333");
                queue.put("333");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "T1").start();

        new Thread(() -> {

            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " - Take " + queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " - Take " + queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " - Take " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "T2").start();
    }
}
SynchronousQueueDemo

10. 執行緒池

執行緒池就是首先建立一些執行緒,它們的集合稱為執行緒池。使用執行緒池可以很好地提高效能,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個任務傳給執行緒池,執行緒池就會啟動一條執行緒來執行這個任務,執行結束以後,該執行緒並不會死亡,而是再次返回執行緒池中成為空閒狀態,等待執行下一個任務。

● 執行緒池工作機制

① 線上程池的程式設計模式下,任務是提交給整個執行緒池,而不是直接提交給某個執行緒,執行緒池在拿到任務後,就在內部尋找是否有空閒的執行緒,如果有,則將任務交給某個空閒的執行緒。

② 一個執行緒同時只能執行一個任務,但可以同時向一個執行緒池提交多個任務。

● 為什麼使用?

多執行緒執行時間,系統不斷的啟動和關閉新執行緒,成本非常高,會過渡消耗系統資源,以及過渡切換執行緒的危險,從而可能導致系統資源的崩潰。這時,執行緒池就是最好的選擇了。

10-1. 執行緒池的好處

降低系統資源消耗。通過重用已存在的執行緒,降低執行緒建立和銷燬造成的消耗;

提高系統響應速度。當有任務到達時,通過複用已存在的執行緒,無需等待新執行緒的建立便能立即執行;

方便執行緒併發數的管控。因為執行緒若是無限制的建立,可能會導致記憶體佔用過多而產生OOM,並且會造成cpu過度切換(cpu切換執行緒是有時間成本的(需要保持當前執行執行緒的現場,並恢復要執行執行緒的現場))。

提供更強大的功能。延時執行、定時迴圈執行的策略等

10-2. java中提供的執行緒池

Executors類提供了4種不同的執行緒池:newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor、newScheduledThreadPool

① newCachedThreadPool():建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。

適用於負載較輕的場景,執行短期非同步任務。(可以使得任務快速得到執行,因為任務時間執行短,可以很快結束,也不會造成cpu過度切換)

可快取執行緒池:

  1. 執行緒數無限制
  2. 有空閒執行緒則複用空閒執行緒,若無空閒執行緒則新建執行緒
  3. 終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒
  4. 一定程式減少頻繁建立/銷燬執行緒,減少系統開銷

建立方法:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

原始碼:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

示例程式碼:

public class NewCachedThreadPoolTest {

    public static void main(String[] args) {
        // 建立一個可快取執行緒池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            try {
                // sleep可明顯看到使用的是執行緒池裡面以前的執行緒,沒有建立新的執行緒
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                public void run() {
                    // 列印正在執行的快取執行緒資訊
                    System.out.println(Thread.currentThread().getName()
                            + "正在被執行");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}
View Code

② newFiexedThreadPool(int Threads):建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。

適用於負載較重的場景,對當前執行緒數量進行限制。(保證執行緒數可控,不會造成執行緒過多,導致系統負載更為嚴重)

定長執行緒池:

  1. 可控制執行緒最大併發數(同時執行的執行緒數)
  2. 超出的執行緒會在佇列中等待

建立方法:

//nThreads => 最大執行緒數即 maximumPoolSize
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);

//threadFactory => 建立執行緒的方
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);

原始碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

示例程式碼:

public class NewFixedThreadPoolTest {

    public static void main(String[] args) {
        // 建立一個可重用固定個數的執行緒池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            fixedThreadPool.execute(new Runnable() {
                public void run() {
                    try {
                        // 列印正在執行的快取執行緒資訊
                        System.out.println(Thread.currentThread().getName() + "正在被執行");
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}
View Code

SingleThreadExecutor()建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

適用於需要保證順序執行各個任務。

單執行緒化的執行緒池:

  1. 有且僅有一個工作執行緒執行任務
  2. 所有任務按照指定順序執行,即遵循佇列的入隊出隊規則

建立方法:

ExecutorService singleThreadPool = Executors.newSingleThreadPool();

原始碼:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

示例程式碼:

public class NewSingleThreadExecutorTest {


    public static void main(String[] args) {
        //建立一個單執行緒化的執行緒池
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            singleThreadExecutor.execute(new Runnable() {
                public void run() {
                    try {
                        //結果依次輸出,相當於順序執行各個任務
                        System.out.println(Thread.currentThread().getName() + "正在被執行,列印的值是:" + index);
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}
View Code

newScheduledThreadPool(int corePoolSize):建立一個定長執行緒池,支援定時及週期性任務執行。

適用於執行延時或者週期性任務。

定長執行緒池:

  1. 支援定時及週期性任務執行。

建立方法:

//nThreads => 最大執行緒數即maximumPoolSize
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

原始碼:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue());
}

示例程式碼:

public class NewScheduledThreadPoolTest {

    public static void main(String[] args) {
        //建立一個定長執行緒池,支援定時及週期性任務執行——延遲執行
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        //延遲1秒執行
                 /*scheduledThreadPool.schedule(new Runnable() {
                     public void run() {
                        System.out.println("延遲1秒執行");
                     }
                 }, 1, TimeUnit.SECONDS);*/


        //延遲1秒後每3秒執行一次
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            public void run() {
                System.out.println("延遲1秒後每3秒執行一次");
            }
        }, 1, 3, TimeUnit.SECONDS);

    }
}
View Code

10-3. ThreadPoolExecutor(重要)

在阿里巴巴開發手冊中,明確規定執行緒池不允許使用 Executors 去建立,所以我們需要使用 ThreadPoolExecutor 的方式建立執行緒池。

● ThreadPoolExecutor提供了四個建構函式:

// 五個構造引數
public
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } // 六個構造引數 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } // 六個構造引數 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } // 七個構造引數,本質的ThreadPoolExecutor() public ThreadPoolExecutor(int corePoolSize, // 核心執行緒池大小 int maximumPoolSize,// 最大核心執行緒池大小 long keepAliveTime, // 超時了沒有人呼叫就會釋放 TimeUnit unit, // 超時單位 BlockingQueue<Runnable> workQueue, // 阻塞佇列 ThreadFactory threadFactory, // 執行緒工廠:建立執行緒的,一般不用動 RejectedExecutionHandler handler){ // 拒絕策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

● 執行緒池的主要引數:

① int corePoolSize:該執行緒池中核心執行緒數最大值

執行緒池新建執行緒的時候,如果當前執行緒總數小於corePoolSize,則新建的是核心執行緒,如果超過corePoolSize,則新建的是非核心執行緒

核心執行緒預設情況下會一直存活線上程池中,即使這個核心執行緒啥也不幹(閒置狀態)。

如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那麼核心執行緒如果不幹活(閒置狀態)的話,超過一定時間(時長下面引數決定),就會被銷燬掉

② int maximumPoolSize:該執行緒池中執行緒總數最大值

執行緒總數 = 核心執行緒數 + 非核心執行緒數。

③ long keepAliveTime:該執行緒池中非核心執行緒閒置超時時長

當執行緒池中執行緒數大於核心執行緒數時,執行緒的空閒時間如果超過執行緒存活時間,那麼這個執行緒就會被銷燬,直到執行緒池中的執行緒數小於等於核心執行緒數。

如果設定allowCoreThreadTimeOut = true,則會作用於核心執行緒

④ TimeUnit uni:keepAliveTime的超時單位,TimeUnit是一個列舉型別

NANOSECONDS : 1微毫秒 = 1微秒 / 1000

MICROSECONDS : 1微秒 = 1毫秒 / 1000

MILLISECONDS : 1毫秒 = 1秒 /1000

SECONDS : 秒

MINUTES : 分

HOURS : 小時

DAYS : 天

⑤BlockingQueue<Runnable> workQueue:阻塞佇列

該執行緒池中的任務佇列:維護著等待執行的Runnable物件

當所有的核心執行緒都在幹活時,新新增的任務會被新增到這個佇列中等待處理,如果佇列滿了,則新建非核心執行緒執行任務

⑥ ThreadFactory threadFactory:建立執行緒的方式(一般用不上)

這是一個介面,你new他的時候需要實現他的 Thread newThread(Runnable r)方法

⑦ RejectedExecutionHandler handler:拒絕策略,執行緒池和佇列都滿了,再加入執行緒會執行此策略。

這玩意兒就是丟擲異常專用的,比如上面提到的兩個錯誤發生了,就會由這個handler丟擲異常,你不指定他也有個預設的

○ 程式碼演示

public class Demo01 {
    public static void main(String[] args) {

        ExecutorService threadExecutor = new ThreadPoolExecutor(
                2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() // 丟棄任務,並丟擲RejectedExecutionException異常 【 預設 】
            );

        try {
            // 最大承載:maximumPoolSize + workQueue,超過則丟擲 RejectedExecutionException 異常
            for (int i = 0; i < 9; i++) {
                // 使用執行緒池建立執行緒
                threadExecutor.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " => OK");
                });
            }
        } finally {
            // 執行緒池用完,程式結束,關閉執行緒池
            threadExecutor.shutdown();
        }
    }
}

10-4.執行緒池的四種拒絕策略

① ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。

這是執行緒池預設的拒絕策略,在任務不能再提交的時候,丟擲異常,及時反饋程式執行狀態。

如果是比較關鍵的業務,推薦使用此拒絕策略,這樣子在系統不能承載更大的併發量的時候,能夠及時的通過異常發現。

② ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務

如果任務被拒絕了,則由呼叫執行緒(提交任務的執行緒)直接執行此任務。

③ ThreadPoolExecutor.DiscardPolicy:丟棄任務,但是不丟擲異常。如果執行緒佇列已滿,則後續提交的任務都會被丟棄,且是靜默丟棄。

使用此策略,可能會使我們無法發現系統的異常狀態。

建議是一些無關緊要的業務採用此策略。

例如,本人的部落格網站統計閱讀量就是採用的這種拒絕策略。

④ ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新提交被拒絕的任務。

此拒絕策略,是一種喜新厭舊的拒絕策略。

是否要採用此種拒絕策略,還得根據實際業務是否允許丟棄老任務來認真衡量。

10-5. 如何配置執行緒池

● CPU密集型任務

儘量使用較小的執行緒池,一般為CPU核心數+1。 因為CPU密集型任務使得CPU使用率很高,若開過多的執行緒數,會造成CPU過度切換。

● IO密集型任務

可以使用稍大的執行緒池,一般為2*CPU核心數。 IO密集型任務CPU使用率並不高,因此可以讓CPU在等待IO的時候有其他執行緒去處理別的任務,充分利用CPU時間。

● 混合型任務

可以將任務分成IO密集型和CPU密集型任務,然後分別用不同的執行緒池去處理。 只要分完之後兩個任務的執行時間相差不大,那麼就會比序列執行來的高效。
因為如果劃分之後兩個任務執行時間有資料級的差距,那麼拆分沒有意義。
因為先執行完的任務就要等後執行完的任務,最終的時間仍然取決於後執行完的任務,而且還要加上任務拆分與合併的開銷,得不償失

11. 非同步函數語言程式設計 - CompletableFuture

使用 FutureFuture獲得非同步執行結果時,要麼呼叫阻塞方法 get() ,要麼輪詢看 isDone()是否為 true,這兩種方法都不是很好,因為主執行緒也會被迫等待。

從 Java 8 開始引入了 CompletableFuture,它針對 Future做了改進,可以傳入回撥物件,當非同步任務完成或者發生異常時,自動呼叫回撥物件的回撥方法。

11-1. 建立 CompletableFuture 物件

CompletableFuture 提供了四個靜態方法用來建立 CompletableFuture 物件

- public static CompletableFuture<Void> runAsync(Runnable runnable)

- public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

Asynsc 表示非同步,而 supplyAsync runAsync不同在與前者非同步返回一個結果,後者是 void 。第二個函式第二個引數表示是用我們自己建立的執行緒池,否則採用預設的 ForkJoinPool.commonPool()作為它的執行緒池.其中 Supplier是一個函式式介面,代表是一個生成者的意思,傳入 0 個引數,返回一個結果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            return "hello world";
  });
System.out.println(future.get());  //阻塞的獲取結果  ''helllo world"

11-2. Demo

public class Demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 沒有返回值的 runAsync 非同步回撥
        // runAsync();
        // 有返回值的 supplyAsync 非同步回撥
        supplyAsync();
    }

    // 沒有返回值的 runAsync 非同步回撥
    public static void runAsync() throws ExecutionException, InterruptedException {
// 建立非同步執行任務 CompletableFuture
<Void> future = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " runAsync => Void"); }); System.out.println("11111111"); // 獲取阻塞執行結果 future.get(); } // 有返回值的 supplyAsync 非同步回撥 public static void supplyAsync() throws ExecutionException, InterruptedException { // 建立非同步執行任務: CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " runAsync => Void"); // int i = 10 / 0; // 異常呼叫失敗回撥 return 2048; }); // 成功和失敗的回撥 Integer result = supplyAsync // 如果執行成功: .whenComplete((t, u) -> { System.out.println("t => " + t); // 正常的返回結果 System.out.println("u => " + u); // 錯誤描述資訊:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero }) // 如果執行異常: .exceptionally((e) -> { System.out.println(e.getMessage()); return 404; // 錯誤的返回結果 }) // 獲取執行結果 .get(); System.out.println("result => " + result); } }

12深入單例模式

單例模式(Singleton Pattern)是 Java 中最簡單的設計模式之一。這種型別的設計模式屬於建立型模式,它提供了一種建立物件的最佳方式。

這種模式涉及到一個單一的類,該類負責建立自己的物件,同時確保只有單個物件被建立。這個類提供了一種訪問其唯一的物件的方式,可以直接訪問,不需要例項化該類的物件。

注意:

  • 1、單例類只能有一個例項。
  • 2、單例類必須自己建立自己的唯一例項。
  • 3、單例類必須給所有其他物件提供這一例項。

多種單例的特性:

單例模式 是否推薦 懶載入 反序列化單例 反射單例 克隆單例 效能、失效問題
餓漢式 Eager載入推薦 x x x x 類載入時就初始化,浪費記憶體。
懶漢式(同步方法) x x x

x

存在效能問題,每次獲取示例都會進行同步
雙重檢測鎖(DCL) 可用 x x x

JDK < 1.5 失效

靜態內部類 推薦 x x x
列舉 最推薦 x

JDK < 1.5 不支援

自動支援序列化機制,絕對防止多次例項化

12-1. 單例模式的幾種實現方式

① 餓漢式:該模式在類被載入時就會例項化一個物件。

該模式能簡單快速的建立一個單例物件,而且是執行緒安全的(只在類載入時才會初始化,以後都不會)。但它有一個缺點,就是不管你要不要都會直接建立一個物件,會消耗一定的效能(當然很小很小,幾乎可以忽略不計,所以這種模式在很多場合十分常用而且十分簡單)

// 餓漢式單例
public class Hungry {

    // 在類裝載時就例項化
    private static Hungry HUNGRY = new Hungry();

    // 私有化構造方法
    private Hungry() { }

    // 提供方法讓外部獲取例項
    public static Hungry getInstance() {
        return HUNGRY;
    }
}

這種做法很方便的幫我們解決了多執行緒例項化的問題,但是缺點也很明顯。

因為這句程式碼 private static Hungry HUNGRY = new Hungry(); 的關係,所以該類一旦被jvm載入就會馬上例項化!

那如果我們不想用這個類怎麼辦呢? 是不是就浪費了呢?既然這樣,我們來看下替代方案! 懶漢式。

② 懶漢式:該模式只在你需要物件時才會生成單例物件(比如呼叫getInstance方法)

這種方式具備很好的 lazy loading,能夠在多執行緒中很好的工作,但是,效率很低,99% 情況下不需要同步。

public class LazyMan {

    private static LazyMan LAZY_MAN;

    private LazyMan() { }

    public synchronized static LazyMan getInstance() {
        if (LAZY_MAN== null)
            return new LazyMan();
        return LAZY_MAN;
    }
}

從執行緒安全性上講,不加同步的懶漢式是執行緒不安全的,比如說:有兩個執行緒,一個是執行緒A,一個是執行緒B,它們同時呼叫getInstance方法,那就可能導致併發問題。

所以只要加上 Synchronized 即可,但是這樣一來,會降低整個訪問的速度,而且每次都要判斷,也確實是稍微慢點。

那麼有沒有更好的方式來實現呢?雙重檢查加鎖,可以使用“雙重檢查加鎖”的方式來實現,就可以既實現執行緒安全,又能夠使效能不受到大的影響。

③雙檢鎖/雙重校驗鎖(DCL,即 double-checked locking):這種方式採用雙鎖機制,安全且在多執行緒情況下能保持高效能。

雙重檢查加鎖機制並不是每次進入getInstance方法都需要同步,而是先不同步,進入方法過後,先檢查例項是否存在,如果不存在才進入下面的同步塊,這是第一重檢查。進入同步塊過後,再次檢查例項是否存在,如果不存在,就在同步的情況下建立一個例項,這是第二重檢查。這樣一來,就只需要同步一次了,從而減少了多次在同步情況下進行判斷所浪費的時間。

public class DoubleCheck {

    private volatile static DoubleCheck DOUBLE_CHECK;

    private DoubleCheck() { }

    public static DoubleCheck getInstance() {
        // 先檢查例項是否存在,如果不存在才進入下面同步塊
        if (DOUBLE_CHECK == null) {
// 同步塊,執行緒安全的建立例項
synchronized (DoubleCheck.class) { // 再次檢查例項是否存在,如果不存在則建立例項
if (DOUBLE_CHECK == null) return new DoubleCheck(); } } return DOUBLE_CHECK; } }

volatile關鍵字:將不會被本地執行緒快取,所有對該變數的讀寫都是直接操作共享記憶體,從而確保多個執行緒能正確的處理該變數。

注意:在Java1.4及以前版本中,很多JVM對於volatile關鍵字的實現有問題,會導致雙重檢查加鎖的失敗,因此雙重檢查加鎖的機制只能用在Java5及以上的版本。

這種實現方式既可使實現執行緒安全的建立例項,又不會對效能造成太大的影響,它 只在第一次建立例項的時候同步,以後就不需要同步了,從而加快執行速度

由於 volatile 關鍵字可能會遮蔽掉虛擬機器中一些必要的程式碼優化,所以執行效率並不是很高 ,因此一般建議,沒有特別的需要,不要使用。

也就是說,雖然可以使用雙重加鎖機制來實現執行緒安全的單例,但並不建議大量採用,根據情況來選用吧。

④ 靜態內部類:採用類級內部類,在這個類級內部類裡面去建立物件例項,只要不使用到這個類級內部類,那就不會建立物件例項

這種方式能達到雙檢鎖方式一樣的功效,但實現更簡單。對靜態域使用延遲初始化,應使用這種方式而不是雙檢鎖方式。這種方式只適用於靜態域的情況,雙檢鎖方式可在例項域需要延遲初始化時使用。

當getInstance方法第一次被呼叫的時候,它第一次讀取SingletonHolder.instance,導致SingletonHolder類得到初始化;而這個類在裝載並被初始化的時候,會初始化它的靜態域,從而建立Singleton的例項,由於是靜態的域,因此只會被虛擬機器在裝載類的時候初始化一次,並由虛擬機器來保證它的執行緒安全性。

public class Holder {
    // 私有構造方法
    private Holder() { }

    /**
     * 類級的內部類,也就是靜態的成員式內部類,該內部類的例項與外部類的例項
     * 沒有繫結關係,而且只有被呼叫到才會裝載,從而實現了延遲載入
     */  
    private static class InnerClass {
        // 靜態初始化器,由JVM來保證執行緒安全
        private static final Holder HOLDER = new Holder();
    }

    public static final Holder getInstance() {
        return Holder.getInstance();
    }
}

這個模式的優勢在於,getInstance 方法並沒有被同步,並且只是執行一個域的訪問,因此延遲初始化並沒有增加任何訪問成本。

⑤ 列舉:列舉實現單例是最為推薦的一種方法,因為它更簡潔並且就算通過序列化,反射等也沒辦法破壞單例性

  • Java的列舉型別實質上是功能齊全的類,因此可以有自己的屬性和方法;
  • Java列舉型別的基本思想:通過公有的靜態final域為每個列舉常量匯出例項的類
  • 從某個角度講,列舉是單例的泛型化,本質上是單元素的列舉

這種方式是 Effective Java 作者 Josh Bloch 提倡的方式,它不僅能避免多執行緒同步問題,而且還自動支援序列化機制,防止反序列化重新建立新的物件,絕對防止多次例項化。不過,由於 JDK1.5 之後才加入 enum 特性,用這種方式寫不免讓人感覺生疏,在實際工作中,也很少用。

不能通過 reflection attack 來呼叫私有構造方法。

public enum EnumSingle {

    INSTANCE;

    public EnumSingle getInstance() {
        return INSTANCE;
    }
}

使用列舉來實現單例項控制,會更加簡潔,而且無償的提供了序列化的機制,並由JVM從根本上提供保障,絕對防止多次例項化,是更簡潔、高效、安全的實現單例的方式。

13. 公平鎖、非公平鎖、重入鎖、自旋鎖

13-1. 公平鎖、非公平鎖

public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

1)是什麼

公平鎖就是先來後到、非公平鎖就是允許加塞, Lock lock = new ReentrantLock(Boolean fair); 預設非公平。

  • 公平鎖:多個執行緒按照申請鎖的順序來獲取鎖,類似排隊打飯。

  • 非公平鎖:多個執行緒獲取鎖的順序並不是按照申請鎖的順序,有可能後申請的執行緒優先獲取鎖,在高併發的情況下,有可能會造成優先順序反轉或者節現象。

2)兩者區別

  • 公平鎖:Threads acquire a fair lock in the order in which they requested it

    公平鎖,就是很公平,在併發環境中,每個執行緒在獲取鎖時,會先檢視此鎖維護的等待佇列,如果為空,或者當前執行緒就是等待佇列的第一個,就佔有鎖,否則就會加入到等待佇列中,以後會按照FIFO的規則從佇列中取到自己。

  • 非公平鎖:a nonfair lock permits barging: threads requesting a lock can jump ahead of the queue of waiting threads if the lock happens to be available when it is requested.

    非公平鎖比較粗魯,上來就直接嘗試佔有額,如果嘗試失敗,就再採用類似公平鎖那種方式。

3)other

對 Java ReentrantLock 而言,通過建構函式指定該鎖是否公平,預設是非公平鎖,非公平鎖的優點在於吞吐量比公平鎖大。
對 Synchronized 而言,是一種非公平鎖。

15-2. 重入鎖(遞迴鎖)

1)遞迴鎖是什麼

指的時同一執行緒外層函式獲得鎖之後,內層遞迴函式仍然能獲取該鎖的程式碼,在同一個執行緒在外層方法獲取鎖的時候,在進入內層方法會自動獲取鎖,也就是說,執行緒可以進入任何一個它已經擁有的鎖所同步著的程式碼塊

2)ReentrantLock / Synchronized 就是一個典型的可重入鎖

3)可重入鎖最大的作用是避免死鎖

4)程式碼示例

● synchronized

public class Demo01 {

    public static void main(String[] args) {

        Phone phone = new Phone();

        new Thread(() -> { phone.sms(); }, "A").start();
        new Thread(() -> { phone.call(); }, "B").start();
    }
}

class Phone {

    // 鎖 1
    public synchronized void sms() {
        System.out.println(Thread.currentThread().getName() + " => sms");
        try {
//            TimeUnit.SECONDS.sleep(3); // 執行緒睡眠,鎖不釋放
//            wait(); // 執行緒等待,釋放鎖
        } catch (Exception e) {
            e.printStackTrace();
        }
        call(); // 鎖2 - 這裡也有鎖
    }

    // 鎖 2
    public synchronized void call() {
        System.out.println(Thread.currentThread().getName() + " => call");
//        notify(); // 喚醒 A
    }
}

● ReentrantLock

public class Demo02 {
    public static void main(String[] args) {

        Phone2 phone2 = new Phone2();

        new Thread(() -> { phone2.sms(); }, "A").start();
        new Thread(() -> { phone2.call(); }, "B").start();
    }
}

class Phone2 {

    Lock lock = new ReentrantLock();

    public void sms() {
        lock.lock(); //