JUC併發程式設計
1. 什麼是JUC
JUC 指的是 java.util .concurrent 工具包。這是一個處理執行緒的工具包,在此包中增加了在併發程式設計中很常用的工具類,用於定義類似於執行緒的自定義子系統、包括執行緒池、非同步 IO 和輕量級任務框架,還提供了設計用於多執行緒上下文中的 Collection 實現等;JUC 是在JDK 1.5 開始出現的。下面一起來看看它怎麼使用。
2. 執行緒和程序
2-1. 程序:正在進行中的程式(直譯),一個程序至少包含一個或多個執行緒。
2-2. 執行緒:程序中一個負責程式執行的控制單元(執行路徑),每一個執行緒都有自己執行的內容,這個內容可以稱之為執行緒要執行的任務;
2-3. 併發、並行 併發程式設計:併發並行 併發(多執行緒操作同一個資源)1、Java預設有幾個執行緒?
答:2個,main主執行緒、GC執行緒
2、Java 真的可以開啟執行緒嗎?
答:不可以,只能通過本地方法呼叫底層的C++去操作
public synchronized void start() { if (threadStatus != 0) throw new IllegalThreadStateException(); group.add(this); boolean started = false; try { start0(); started = true; }finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { } } } // 本地方法,底層的C++ ,Java 無法直接操作硬體 private native void start0();
- CPU一核,模擬出多條執行緒,快速交替
- CPU多核,多個執行緒可以同時執行
// 獲取CPU核數 // CPU密集型、IO密集型 Runtime.getRuntime().availableProcessors();
併發程式設計的本質:充分利用CPU的資源
2-4. 執行緒的幾個狀態public enum State { // 新生 NEW, // 執行 RUNNABLE, // 阻塞 BLOCKED, // 等待(死等,也是阻塞的等待) WAITING, // 超時等待(指定的時間內等待,過時不候) TIMED_WAITING, // 終止 TERMINATED; }2-5. sleep /wait區別
① 來自不同的類: - sleep來自Thread類;
- wait來自Thread類; ② 是否釋放鎖: - sleep方法不會釋放鎖;
- wait會方法釋放鎖,使得其他執行緒可以使用同步控制塊或者方法(鎖程式碼塊和方法鎖); ③ 使用範圍: - sleep可以在任何地方使用;
- wait,notify和notifyAll只能在同步控制方法或者同步控制塊裡面使用; ④ 異常捕獲: - sleep必須捕獲異常;
- wait,notify和notifyAll不需要捕獲異常;
3. Lock鎖同步
在JDK1.5之前,解決多執行緒安全問題有兩種方式:sychronized 隱式鎖
- 同步程式碼塊、同步方法
在JDK1.5之後,出現了更加靈活的方式:Lock 顯式鎖
- 同步鎖
3-1. Lock需要通過lock()方法上鎖,通過unlock()方法釋放鎖。為了保證鎖能釋放,所有unlock方法一般放在finally中去執行。
○ Lock實現類:Lock l = ...; l.lock(); try { // access the resource protected by this lock } finally { l.unlock(); }
○ReentrantLock:預設為非公平鎖
3-2. 再來看一下賣票案例: ● 傳統的Synchronized公平鎖:十分公平:可以先來後到
非公平鎖:十分不公平:可以插隊 (預設)
public class SaleTicketDemo01 { public static void main(String[] args) { // 併發:多執行緒操作同一個類,把資源類丟入執行緒就可以了 Ticket ticket = new Ticket(); // @FunctionalInterface 函式式介面,jdk1.8 lambda表示式 (引數)->{ 程式碼 } new Thread(() -> { for (int i = 0; i < 30; i++) ticket.sale(); }, "001").start(); new Thread(() -> { for (int i = 0; i < 30; i++) ticket.sale(); }, "002").start(); new Thread(() -> { for (int i = 0; i < 30; i++) ticket.sale(); }, "003").start(); } } // 資源類 OOP class Ticket { private int ticket = 30; // synchronized 本質: 佇列,鎖 public synchronized void sale() { if (ticket > 0) { System.out.println("視窗 " + Thread.currentThread().getName() + " 完成售票,餘票為:" + (--ticket)); } } }
多個執行緒同時操作共享資料ticket,所以會出現執行緒安全問題。會出現同一張票賣了好幾次或者票數為負數的情況。以前用同步程式碼塊和同步方法解決,現在看看用同步鎖怎麼解決。
● Lock介面:直接建立lock物件,然後用lock()方法上鎖,最後用unlock()方法釋放鎖即可。
public class SaleTicketDemo02 { public static void main(String[] args) { // 併發:多執行緒操作同一個類,把資源類丟入執行緒就可以了 Ticket2 ticket2 = new Ticket2(); new Thread(() -> { for (int i = 0; i < 30; i++) ticket2.sale(); }, "001").start(); new Thread(() -> { for (int i = 0; i < 30; i++) ticket2.sale(); }, "002").start(); new Thread(() -> { for (int i = 0; i < 30; i++) ticket2.sale(); }, "003").start(); } } // 資源類 class Ticket2 { private int ticket = 30; private Lock lock = new ReentrantLock(); // 1.建立lock鎖 public void sale() { lock.lock(); // 2.加鎖 try { if (ticket > 0) { System.out.println("Lock視窗 " + Thread.currentThread().getName() + " 完成售票,餘票為:" + (--ticket)); } } finally { lock.unlock(); // 3.解鎖 } } }
3-3.Synchronized 和 Lock 的區別
① Synchronized 內建的 Java 關鍵字;Lock 是一個 Java 類
②Synchronized 無法判斷獲取鎖的狀態;Lock 可以判斷是否獲取到了鎖
③Synchronized 會自動釋放;Lock 必須要手動釋放鎖!如果不釋放 會產生死鎖
④Synchronized 執行緒 1(獲得鎖,阻塞),執行緒 2(等待獲取,死等);Lock 鎖就不一定會等待下去
⑤Synchronized 可重入鎖,不可以中斷的,非公平;Lock 可重入鎖,可以判斷鎖,非公平(可以自己設定)
⑥ Synchronized 適合鎖少量的程式碼同步問題;Lock 適合鎖大量的同步程式碼!
4. 等待喚醒機制
4-1. 虛假喚醒問題
生產消費模式是等待喚醒機制的一個經典案例,看下面的程式碼:
public class TestProductorAndConsumer { public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i++) data.decrement(); }, "生產者01").start(); new Thread(() -> { for (int i = 0; i < 10; i++) data.increment(); }, "消費者02").start(); new Thread(() -> { for (int i = 0; i < 10; i++) data.decrement(); }, "生產者03").start(); new Thread(() -> { for (int i = 0; i < 10; i++) data.increment(); }, "消費者04").start(); } } class Data { private int product = 0; // 共享資料 // product +1 public synchronized void increment() { while (product != 0) { try { this.wait(); // 等待 } catch (InterruptedException e) { e.printStackTrace(); } } product++; System.out.println(Thread.currentThread().getName() + " => " + product); // 通知其他執行緒,我 +1 完畢了 this.notifyAll(); } // product -1 public synchronized void decrement() { while (product == 0) { try { this.wait(); // 等待 } catch (InterruptedException e) { e.printStackTrace(); } } product--; System.out.println(Thread.currentThread().getName() + " => " + product); // 通知其他執行緒,我 -1 完畢了 this.notifyAll(); } }
● 問題存在:當存在 2 個以上的執行緒時,可能會產生虛假喚醒!
● 解決辦法:將 if 改為 while 判斷
// product +1 public synchronized void increment() throws InterruptedException { while (product != 0) this.wait(); // 等待 product++; System.out.println(Thread.currentThread().getName() + " => " + product); // 通知其他執行緒,我 +1 完畢了 this.notifyAll(); } // product -1 public synchronized void decrement() throws InterruptedException { while (product == 0) this.wait(); // 等待 product--; System.out.println(Thread.currentThread().getName() + " => " + product); // 通知其他執行緒,我 -1 完畢了 this.notifyAll(); }
只需要把 if 改成 while,每次都再去判斷一下,就可以了。
4-2. 用Lock鎖實現等待喚醒
public class TestProductorAndConsumer2 { public static void main(String[] args) { Data2 data = new Data2(); new Thread(() -> { for (int i = 0; i < 10; i++) data.decrement(); }, "生產者01").start(); new Thread(() -> { for (int i = 0; i < 10; i++) data.increment(); }, "消費者02").start(); new Thread(() -> { for (int i = 0; i < 10; i++) data.decrement(); }, "生產者03").start(); new Thread(() -> { for (int i = 0; i < 10; i++) data.increment(); }, "消費者04").start(); } } class Data2 { private int product = 0; // 共享資料 private Lock lock = new ReentrantLock(); // 建立鎖物件 Condition condition = lock.newCondition(); // 獲取鎖的監視器物件 // product +1 public void increment() { lock.lock(); // 加鎖 try { while (product != 0) condition.await(); // 等待 product++; System.out.println("Lock" + Thread.currentThread().getName() + " => " + product); condition.signalAll();// 通知其他執行緒,我 +1 完畢了 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); // 釋放鎖 } } // product -1 public void decrement() { lock.lock(); // 加鎖 try { while (product == 0) condition.await(); // 等待 product--; System.out.println("Lock" + Thread.currentThread().getName() + " => " + product); // 通知其他執行緒,我 -1 完畢了 condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); // 釋放鎖 } } }
使用lock同步鎖,就不需要sychronized關鍵字了,需要建立lock物件和condition例項。condition的await()方法、signal()方法和signalAll()方法分別與wait()方法、notify()方法和notifyAll()方法對應。
4-3. 執行緒按序交替
首先來看一道題:
編寫一個程式,開啟 3 個執行緒,這三個執行緒的 ID 分別為 A、B、C, 每個執行緒將自己的 ID 在螢幕上列印 10 遍,要求輸出的結果必須按順序顯示。 如:ABCABCABC…… 依次遞迴分析:
執行緒本來是搶佔式進行的,要按序交替,所以必須實現執行緒通訊,
那就要用到等待喚醒。可以使用同步方法,也可以用同步鎖。
程式碼實現:
public class TestLoopPrint { public static void main(String[] args) { AlternationDemo alternationDemo = new AlternationDemo(); new Thread(()->{for (int i = 0; i < 10; i++) alternationDemo.loopA(); },"執行緒A").start(); new Thread(()->{for (int i = 0; i < 10; i++){alternationDemo.loopB();}},"執行緒B").start(); new Thread(()->{for (int i = 0; i < 10; i++){alternationDemo.loopC();}},"執行緒C").start(); } } class AlternationDemo { // 當前正在執行的執行緒的標記 private int number = 1; // 建立鎖物件,並且使用該鎖建立三個監視器 private Lock lock = new ReentrantLock(); Condition condition_1 = lock.newCondition(); Condition condition_2 = lock.newCondition(); Condition condition_3 = lock.newCondition(); public void loopA() { lock.lock(); try { while (number != 1) { // 判斷 condition_1.await(); // 等待 } System.out.println(Thread.currentThread().getName() + " => AAAAA"); // 喚醒指定的監視器2 number = 2; condition_2.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void loopB() { lock.lock(); try { while (number != 2) { // 判斷 condition_2.await(); // 等待 } System.out.println(Thread.currentThread().getName() + " => BBBBBB"); // 喚醒指定的監視器3 number = 3; condition_3.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void loopC() { lock.lock(); try { while (number != 3) { // 判斷 condition_3.await(); // 等待 } System.out.println(Thread.currentThread().getName() + " => CCCCCCC"); // 喚醒指定的監視器1 number = 1; condition_1.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }以上編碼就滿足需求。建立三個執行緒,分別呼叫loopA、loopB和loopC方法,這三個執行緒使用condition進行通訊。
5. 鎖的8種問題
1)標準情況按照普通的情況訪問同步方法,檢視輸出
public class Test1 { public static void main(String[] args) { Phone1 phone = new Phone1(); new Thread(() -> {phone.sendSms(); }, "A").start(); try { // 睡眠2秒 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone.call(); }, "B").start(); } } class Phone1 { public synchronized void sendSms() { System.out.println("發簡訊》》》"); } public synchronized void call() { System.out.println("打電話!!!"); } }View Code
- 執行結果:1、發簡訊 2、打電話
2)在其中一種方法中新增sleep方法訪問
在sendSms方法中新增sleep,檢視修改後的列印順序
class Phone1 { public synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); // 睡4秒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("發簡訊》》》"); } public synchronized void call() { System.out.println("打電話!!!"); } }View Code
- 執行結果:1、發簡訊 2、打電話
3)新增一個未加鎖方法,檢視訪問結果synchronized 鎖的是物件this,兩個方法用的使用一個鎖,所以誰先拿到誰先執行
在Phone類中新增hello方法,同時執行緒修改為訪問同步方法和普通方法
public class Test2 { public static void main(String[] args) { Phone2 phone = new Phone2(); // 有鎖 new Thread(() -> { phone.sendSms(); }, "A").start(); try { // 睡眠2秒 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } // 沒鎖 new Thread(() -> { phone.hello(); }, "B").start(); } } class Phone2 { public synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("發簡訊》》》"); } public synchronized void call() { System.out.println("打電話!!!"); } // 這裡沒有鎖,不是同步方法,不受鎖的影響 public void hello() { System.out.println("hello!"); } }View Code
- 執行結果:1、hello 2、發簡訊
4)執行時建立兩個不同物件,通過不同物件訪問加鎖的方法
在建立執行緒時,通過不同物件執行同步方法,檢視執行結果
public class Test2 { public static void main(String[] args) { // 兩個物件,兩個呼叫者,兩把鎖! Phone2 phone1 = new Phone2(); Phone2 phone2 = new Phone2(); // 有鎖 new Thread(() -> { phone1.sendSms(); }, "A").start(); try { // 睡眠2秒 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } // 有鎖 new Thread(() -> { phone2.call(); }, "B").start(); } } class Phone2 { // synchronized 是 物件鎖this public synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("發簡訊》》》"); } public synchronized void call() { System.out.println("打電話!!!"); } // 這裡沒有鎖 public void hello() { System.out.println("hello!"); } }View Code
- 執行結果:1、打電話 2、發簡訊
synchronized 鎖的是物件this。兩個物件,兩個呼叫者,兩把鎖!所以互不受影響
5)將加鎖的方法改為靜態方法,同一個物件執行
兩個同步方法都改為靜態方法,通過同一個物件執行方法,檢視執行結果
public class Test3 { public static void main(String[] args) { Phone3 phone = new Phone3(); new Thread(() -> { phone.sendSms(); }, "A").start(); try { // 睡眠2秒 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone.call(); }, "B").start(); } } class Phone3 { // static synchronized 靜態方法鎖 public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); // 睡眠4秒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("發簡訊》》》"); } public static synchronized void call() { System.out.println("打電話!!!"); } }View Code
- 執行結果:1、發簡訊 2、打電話
static synchronized靜態同步方法,類一載入就有了,所以鎖的是Class
6)通過兩個物件訪問靜態同步方法
public class Test3 { public static void main(String[] args) { // 兩個物件的Class類模板只有一個,static,鎖的是Class Phone3 phone1 = new Phone3(); Phone3 phone2 = new Phone3(); new Thread(() -> { phone1.sendSms(); }, "A").start(); try { // 睡眠2秒 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone2.call(); }, "B").start(); } } class Phone3 { // static synchronized 靜態方法,類一載入就有了,所以鎖的是Class public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); // 睡眠4秒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("發簡訊》》》"); } public static synchronized void call() { System.out.println("打電話!!!"); } }View Code
- 執行結果:1、打電話 2、發簡訊
兩個物件的Class類模板只有一個,鎖的是Class,只有一把鎖
7)一個靜態同步方法 一個普通同步方法,通過同一個物件執行
其中的一個方法去掉靜態關鍵字,檢視執行結果
public class Test4 { public static void main(String[] args) { Phone4 phone = new Phone4(); new Thread(() -> { phone.sendSms(); }, "A").start(); try { // 睡眠2秒 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone.call(); }, "B").start(); } } class Phone4 { // 靜態同步方法 鎖Class public static synchronized void sendSms() { try { // 睡眠4秒 TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("發簡訊》》》"); } // 普通同步方法 鎖this public synchronized void call() { System.out.println("打電話!!!"); } }View Code
- 執行結果:1、打電話 2、發簡訊
靜態同步鎖 Class,普通同步鎖 this,兩把不同的鎖,所以互不受影響
8)一個靜態同步方法一個普通同步方法,兩個物件執行
public class Test4 { public static void main(String[] args) { Phone4 phone1 = new Phone4(); Phone4 phone2 = new Phone4(); new Thread(() -> { phone1.sendSms(); }, "A").start(); try { // 睡眠2秒 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone2.call(); }, "B").start(); } } class Phone4 { // 靜態同步方法 鎖Class public static synchronized void sendSms() { try { // 睡眠4秒 TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("發簡訊》》》"); } // 普通同步方法 鎖this public synchronized void call() { System.out.println("打電話!!!"); } }View Code
- 執行結果:1、打電話 2、發簡訊
兩個物件 兩把不同的鎖,一個Class 一個this
● 總結:
① 物件鎖:使用 synchronized 修飾非靜態的方法以及 synchronized(this) 同步程式碼塊使用的鎖是物件鎖。
② 類鎖:使用 synchronized 修飾靜態的方法以及 synchronized(class) 同步程式碼塊使用的鎖是類鎖。
③ 私有鎖:在類內部宣告一個私有屬性如 private Object lock,在需要加鎖的同步塊使用 synchronized(lock)
它們的特性:
- 物件鎖具有可重入性。
- 當一個執行緒獲得了某個物件的物件鎖,則該執行緒仍然可以呼叫其他任何需要該物件鎖的 synchronized 方法或 synchronized(this) 同步程式碼塊。
- 當一個執行緒訪問某個物件的一個 synchronized(this) 同步程式碼塊時,其他執行緒對該物件中所有其它 synchronized(this) 同步程式碼塊的訪問將被阻塞,因為訪問的是同一個物件鎖。
- 每個類只有一個類鎖,但是類可以例項化成物件,因此每一個物件對應一個物件鎖。
- 普通方法與同步方鎖法無關。
- 類鎖和物件鎖不會產生競爭。
- 私有鎖和物件鎖也不會產生競爭。
- 使用私有鎖可以減小鎖的細粒度,減少由鎖產生的開銷。
6. 集合類不安全
多個執行緒共同操作一個執行緒不安全的類時報併發修改異常
6-1. List - 執行緒不安全
public class ListTest { public static void main(String[] args) { // List<String> list = new Vector<>(); // List<String> list = Collections.synchronizedList(new ArrayList<>()); List<String> list = new CopyOnWriteArrayList<>(); for (int i = 0; i < 10; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(Thread.currentThread().getName() + " - " + list); }, String.valueOf(i)).start(); } } }● 解決方案:
① 使用 Vector 物件,實際上是加了 Synchronized 同步,實現同步需要很高的花費,效率較低
② 使用 Collections 集合工具類,給執行緒不安全的集合新增一層封裝
③ 寫入時複製 CopyOnWriteArrayList,加了寫鎖的集合,鎖住的是整個物件,但讀操作可以併發執行
- 寫入時複製:這種集合將資料放在固定的陣列中,任何資料的改變,都會重新建立一個新的陣列來記錄值。
- 這種集合被設計用在,讀多寫少的時候推薦使用!
6-2. Set - 執行緒不安全
public class SetTest { public static void main(String[] args) { // Set<String> set = Collections.synchronizedSet(new HashSet<>()); Set<String> set = new CopyOnWriteArraySet<>(); for (int i = 0; i < 10; i++) { new Thread(() -> { set.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(Thread.currentThread().getName() + " - " + set); }, String.valueOf(i)).start(); } } }
● 解決方案:
① 使用 Collections 集合工具類,給執行緒不安全的集合新增一層封裝
② CopyOnWriteArraySet 寫入時複製,讀多寫少的時候推薦使用
● HashSet 底層:
// HashSet是基於HashMap實現的 public HashSet() { map = new HashMap<>(); } // add set 本質就是 map key 是無法重複的! public boolean add(E e) { return map.put(e, PRESENT)==null; } // PRESENT 是用來填充 map 中的 value,定義為 Object 型別。 private static final Object PRESENT = new Object();
6-3. Map - 執行緒不安全
public class MapTest { public static void main(String[] args) { // Map<String, String> map = Collections.synchronizedMap(new HashMap<>()); Map<String, String> map = new ConcurrentHashMap<>(); for (int i = 0; i < 30; i++) { int finalI = i; new Thread(() -> { map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5)); System.out.println(Thread.currentThread().getName() + " - " + map); }, String.valueOf(i)).start(); } } }
解決方案:
① 使用 Collections 集合工具類,給執行緒不安全的集合新增一層封裝
② 使用執行緒安全的 hash 表 ConcurrentHashMap,注意:1.7前採用是鎖分段機制,1.8後採用了CAS演算法
1、Map 是這樣用的嗎? 不是,工作中不使用 HashMap 2、預設等價於什麼? new HashMap<>(16, 0.75f) - 初始容量:static final int DEFAULT_INITIAL_CAPACITY = 1 << 4 - 載入因子:static final float DEFAULT_LOAD_FACTOR = 0.75f
7. 建立執行緒的方式 --- 實現Callable介面
● Callable:類似於Runnable
- call 方法有返回值,並且能夠丟擲異常。
- 有快取,結果可能需要等待,會阻塞!
● FutureTask:獲取 Callable 任務的返回值
使用 Future 我們可以得知 Callable 的執行狀態,以及獲取 Callable 執行完後的返回值。
Future 的方法介紹:
- get() :阻塞式,用於獲取 Callable/Runnable 執行完後的返回值。
- 帶時間引數的get()過載方法用於最多等待的時間後,如仍未返回則執行緒將繼續執行。
- cancel() :撤銷正在執行 Callable 的 Task。
- isDone():是否執行完畢。
- isCancelled():任務是否已經被取消。
● 使用例項:
public class CallableTest { public static void main(String[] args) { CallableDemo callableDemo = new CallableDemo(); for (int i = 0; i < 10; i++) { // 執行callable方式,需要FutureTask實現類的支援,用來接收運算結果 FutureTask<Integer> task = new FutureTask<>(callableDemo); // 執行執行緒任務 new Thread(task, String.valueOf(i)).start(); // 接收執行緒運算結果 try { // 需要等到執行緒執行完呼叫get方法才會執行,也可以用於閉鎖操作 System.out.println(task.get()); } catch (Exception e) { e.printStackTrace(); } } } } class CallableDemo implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println(Thread.currentThread().getName() + " => call()"); TimeUnit.SECONDS.sleep(1); // 睡眠1秒 return new Random().nextInt(100); } }
Callable介面和實現Runable介面的區別就是,Callable帶泛型,其call方法有返回值。使用的時候,需要用FutureTask來接收返回值。而且它也要等到執行緒執行完呼叫get方法才會執行,也可以用於閉鎖操作。
8. 讀寫鎖 -ReadWriterLock
我們在讀資料的時候,可以多個執行緒同時讀,不會出現問題,但是寫資料的時候,如果多個執行緒同時寫資料,那麼到底是寫入哪個執行緒的資料呢?
所以,如果有兩個執行緒,讀-寫 和寫-寫 需要互斥,讀-讀 不需要互斥。這個時候可以用讀寫鎖。
獨佔鎖(寫鎖):一次只能被一個執行緒佔有
共享鎖(讀鎖):多個執行緒可以同時佔有
程式碼實現:
public class ReadWriteLockDemo { public static void main(String[] args) { RWLTest rwlTest = new RWLTest(); // 寫入 for (int i = 0; i < 5; i++) { int finalI = i; new Thread(() -> { rwlTest.set(String.valueOf(finalI), finalI + ""); }, String.valueOf(i)).start(); } // 讀取 for (int i = 0; i < 5; i++) { int finalI = i; new Thread(() -> { rwlTest.get(String.valueOf(finalI)); }, String.valueOf(i)).start(); } } } class RWLTest { private volatile Map<String, Object> map = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 讀(可以多個執行緒同時操作) public void get(String key) { readWriteLock.readLock().lock();// 上鎖 try { System.out.println(Thread.currentThread().getName() + " -> 讀取 key[" + key + "]"); Object obj = map.get(key); System.out.println(Thread.currentThread().getName() + " -> 獲得 value[" + obj + "]"); } finally { readWriteLock.readLock().unlock();// 釋放鎖 } } // 寫(一次只能有一個執行緒操作) public void set(String key, Object value) { readWriteLock.writeLock().lock();// 上鎖 try { System.out.println(Thread.currentThread().getName() + " -> 寫入 key[" + key + "]"); map.put(key, value); System.out.println(Thread.currentThread().getName() + " -> 寫入成功!!!"); } finally { readWriteLock.writeLock().unlock();// 釋放鎖 } } }
執行結果:
9. 阻塞佇列 -BlockingQueue
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞的插入和移除方法。
① 支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿。
② 支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。
9-1. 阻塞佇列的四種方法
方法\處理方式 | 丟擲異常 | 有返回值,不丟擲異常 | 阻塞等待 | 超時等待 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查隊首方法 | element() | peek() | 不可用 | 不可用 |
● 丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。
● 有返回值,不丟擲異常:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null
● 阻塞等待:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。
● 超時等待:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出等待。
程式碼演示:
public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { // test01(); // test02(); // test03(); test04(); } /* 丟擲異常 */ public static void test01() { // 佇列的大小 ArrayBlockingQueue<String> queue = new ArrayBlockingQueue(3); System.out.println(queue.add("a")); System.out.println(queue.add("b")); System.out.println(queue.add("c")); // System.out.println(queue.add("d")); // 佇列已滿,丟擲異常 IllegalStateException System.out.println("隊首 = " + queue.element()); // 檢測隊首元素 System.out.println("=================="); System.out.println(queue.remove()); System.out.println(queue.remove()); System.out.println(queue.remove()); // System.out.println(queue.remove()); // 佇列置空,沒有元素,丟擲異常 NoSuchElementException } /* 有返回值,不丟擲異常 */ private static void test02() { ArrayBlockingQueue queue = new ArrayBlockingQueue(3); System.out.println(queue.offer("111")); System.out.println(queue.offer("222")); System.out.println(queue.offer("333")); System.out.println(queue.offer("444")); // false,不丟擲異常 System.out.println("隊首 = " + queue.peek()); // 檢測隊首元素 System.out.println("======================================"); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); // null,不丟擲異常 } /* 等待,阻塞(一直阻塞) */ private static void test03() throws InterruptedException { ArrayBlockingQueue queue = new ArrayBlockingQueue(3); // 一直阻塞 queue.put("AAA"); queue.put("BBB"); queue.put("CCC"); // queue.put("DDD"); // 佇列已滿,等待新增 -> 阻塞 System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); // System.out.println(queue.take()); // 佇列為空,等待獲取 -> 阻塞 } /* 有返回值,不丟擲異常,等待,阻塞(等待超時) */ private static void test04() throws InterruptedException { ArrayBlockingQueue queue = new ArrayBlockingQueue(3); System.out.println(queue.offer("A1", 2, TimeUnit.SECONDS)); System.out.println(queue.offer("B2", 2, TimeUnit.SECONDS)); System.out.println(queue.offer("C3", 2, TimeUnit.SECONDS)); System.out.println(queue.offer("D4", 2, TimeUnit.SECONDS)); // 等待超過兩秒,就返回false System.out.println("================================"); System.out.println(queue.poll(2, TimeUnit.SECONDS)); System.out.println(queue.poll(2, TimeUnit.SECONDS)); System.out.println(queue.poll(2, TimeUnit.SECONDS)); System.out.println(queue.poll(2, TimeUnit.SECONDS)); // 等待超過兩秒,就返回null } }BlockingQueueTest
9-2. Java裡的阻塞佇列
JDK7提供了7個阻塞佇列。分別是:
①ArrayBlockingQueue:是用陣列實現的有界阻塞佇列,初始化時必須傳入容量,是FIFO佇列,支援公平和非公平鎖。
② LinkedBlockingQueue:是用連結串列實現的有界阻塞佇列,初始化時如果沒有傳入容量,則容量時Intger.MAX_VALUE,是FIFO佇列,因為入隊和出隊方法各是一把鎖,所以一般情況下併發效能優於ArrayBlockingQueue。
③ LinkedBlockingDeque:是雙向連結串列實現的有界阻塞佇列,初始化時如果沒有傳入容量,則容量時Intger.MAX_VALUE,可以實現FIFO佇列,也可以實現LIFO佇列。
④ PriorityBlockingQueue:是陣列實現的具有優先順序的無界阻塞佇列,有兩個注意事項,
- 該佇列是具有優先順序的,不是按先進先出或者後進先出,是按優先順序的高低,所以入隊的物件必須是實現了Comparable介面的。
- 佇列雖然邏輯上是無界的,但因為是陣列實現的,不能無限擴容,作者在程式碼裡限制了,最大容量是Intger.MAX_VALUE-8。
⑤ DelayQueue:延遲佇列可以指定多久才能從佇列中獲取當前元素。只有延時期滿後才能從佇列中獲取元素, 元素必須是Delayed的子類
⑥ SynchronousQueue:不儲存元素的阻塞佇列,每一個put操作必須等待take操作,否則不能新增元素。支援公平鎖和非公平鎖。
⑦ LinkedTransferQueue:由連結串列結構組成的無界阻塞佇列,對比LinkedBlockingQueue除了有put、take等方法外,還提供了transfer方法,該方法會阻塞執行緒,直到元素被消費,才返回。
上面列舉了阻塞佇列的異同點,我們可以根據自己的業務場景選擇合適的阻塞佇列。
簡單介紹下其中兩個佇列:
9-2-1. ArrayBlockingQueue
public interface BlockingQueue<E> extends Queue<E> { //入隊,入隊失敗會丟擲異常 boolean add(E e); //入隊,入隊成功返回true,否則返回false boolean offer(E e); //入隊,入隊成功返回,否則進行等待 void put(E e) throws InterruptedException; //入隊,不成功等待一段時間,仍然不能入隊成功返回false,入隊成功返回true boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //出隊,佇列為空進行等待,否則將元素出隊 E take() throws InterruptedException; //出隊,佇列為空進行一段時間的等待,仍然為空返回null,否則將元素出隊 E poll(long timeout, TimeUnit unit) throws InterruptedException; //返回剩餘餘量 int remainingCapacity(); //將某個元素出隊,存在返回true,否則返回false boolean remove(Object o); //判斷是否包含某個元素 public boolean contains(Object o); //將佇列中所有可用元素出隊到集合C中 int drainTo(Collection<? super E> c); //將佇列中最多maxElements個可用元素出隊到集合C中 int drainTo(Collection<? super E> c, int maxElements); }BlockingQueue
ArrayBlockingQueue是一個用陣列實現的有界阻塞佇列。此佇列按照先進先出(FIFO)的原則對元素進行排序。預設情況下不保證訪問者公平的訪問佇列。
什麼是公平的訪問佇列?
所謂公平訪問佇列是指阻塞的所有生產者執行緒或消費者執行緒,當佇列可用時,可以按照阻塞的先後順序訪問佇列,即先阻塞的生產者執行緒,可以先往佇列裡插入元素,先阻塞的消費者執行緒,可以先從佇列裡獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下程式碼建立一個公平的阻塞
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
訪問者的公平性是使用可重入鎖實現的,程式碼如下:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
9-2-2. SynchronousQueue
SynchronousQueue 是一個不儲存元素的阻塞佇列。每一個put操作必須等待一個take操作,否則不能繼續新增元素。
適用場景:執行緒之間資料傳遞,一個執行緒使用過的資料,傳給另外一個執行緒使用。
宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。
public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
公平模式和非公平模式的區別:
- 公平模式:採用公平鎖,在獲取鎖時,增加了isFirst(current)判斷,當且僅當,等待佇列為空或當前執行緒是等待佇列的頭結點時,才可嘗試獲取鎖。
- 非公平模式(預設):採用非公平鎖,那些嘗試獲取鎖且尚未進入等待佇列的執行緒會和等待佇列head結點的執行緒發生競爭。
程式碼演示:
/** * 同步佇列 - SynchronousQueue * 和其它的BlockingQueue不一樣,SynchronousQueue不儲存元素 * put一個元素,必須take取出一個元素出來,否則不能再put進去 */ public class SynchronousQueueDemo { public static void main(String[] args) { SynchronousQueue queue = new SynchronousQueue(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " Put 111"); queue.put("111"); System.out.println(Thread.currentThread().getName() + " Put 222"); queue.put("222"); System.out.println(Thread.currentThread().getName() + " Put 333"); queue.put("333"); } catch (InterruptedException e) { e.printStackTrace(); } }, "T1").start(); new Thread(() -> { try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " - Take " + queue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " - Take " + queue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " - Take " + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "T2").start(); } }SynchronousQueueDemo
10. 執行緒池
執行緒池就是首先建立一些執行緒,它們的集合稱為執行緒池。使用執行緒池可以很好地提高效能,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個任務傳給執行緒池,執行緒池就會啟動一條執行緒來執行這個任務,執行結束以後,該執行緒並不會死亡,而是再次返回執行緒池中成為空閒狀態,等待執行下一個任務。
● 執行緒池工作機制:
① 線上程池的程式設計模式下,任務是提交給整個執行緒池,而不是直接提交給某個執行緒,執行緒池在拿到任務後,就在內部尋找是否有空閒的執行緒,如果有,則將任務交給某個空閒的執行緒。
② 一個執行緒同時只能執行一個任務,但可以同時向一個執行緒池提交多個任務。
● 為什麼使用?
多執行緒執行時間,系統不斷的啟動和關閉新執行緒,成本非常高,會過渡消耗系統資源,以及過渡切換執行緒的危險,從而可能導致系統資源的崩潰。這時,執行緒池就是最好的選擇了。
10-1. 執行緒池的好處
① 降低系統資源消耗。通過重用已存在的執行緒,降低執行緒建立和銷燬造成的消耗;
② 提高系統響應速度。當有任務到達時,通過複用已存在的執行緒,無需等待新執行緒的建立便能立即執行;
③ 方便執行緒併發數的管控。因為執行緒若是無限制的建立,可能會導致記憶體佔用過多而產生OOM,並且會造成cpu過度切換(cpu切換執行緒是有時間成本的(需要保持當前執行執行緒的現場,並恢復要執行執行緒的現場))。
④ 提供更強大的功能。延時執行、定時迴圈執行的策略等
10-2. java中提供的執行緒池
Executors類提供了4種不同的執行緒池:newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor、newScheduledThreadPool
① newCachedThreadPool():建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。適用於負載較輕的場景,執行短期非同步任務。(可以使得任務快速得到執行,因為任務時間執行短,可以很快結束,也不會造成cpu過度切換)
可快取執行緒池:
- 執行緒數無限制
- 有空閒執行緒則複用空閒執行緒,若無空閒執行緒則新建執行緒
- 終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒
- 一定程式減少頻繁建立/銷燬執行緒,減少系統開銷
建立方法:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
原始碼:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
示例程式碼:
public class NewCachedThreadPoolTest { public static void main(String[] args) { // 建立一個可快取執行緒池 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { try { // sleep可明顯看到使用的是執行緒池裡面以前的執行緒,沒有建立新的執行緒 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable() { public void run() { // 列印正在執行的快取執行緒資訊 System.out.println(Thread.currentThread().getName() + "正在被執行"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }View Code
② newFiexedThreadPool(int Threads):建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
適用於負載較重的場景,對當前執行緒數量進行限制。(保證執行緒數可控,不會造成執行緒過多,導致系統負載更為嚴重)
定長執行緒池:
- 可控制執行緒最大併發數(同時執行的執行緒數)
- 超出的執行緒會在佇列中等待
建立方法:
//nThreads => 最大執行緒數即 maximumPoolSize ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads); //threadFactory => 建立執行緒的方 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
原始碼:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
示例程式碼:
public class NewFixedThreadPoolTest { public static void main(String[] args) { // 建立一個可重用固定個數的執行緒池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { fixedThreadPool.execute(new Runnable() { public void run() { try { // 列印正在執行的快取執行緒資訊 System.out.println(Thread.currentThread().getName() + "正在被執行"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }View Code
③SingleThreadExecutor():建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
適用於需要保證順序執行各個任務。
單執行緒化的執行緒池:
- 有且僅有一個工作執行緒執行任務
- 所有任務按照指定順序執行,即遵循佇列的入隊出隊規則
建立方法:
ExecutorService singleThreadPool = Executors.newSingleThreadPool();
原始碼:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
示例程式碼:
public class NewSingleThreadExecutorTest { public static void main(String[] args) { //建立一個單執行緒化的執行緒池 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; singleThreadExecutor.execute(new Runnable() { public void run() { try { //結果依次輸出,相當於順序執行各個任務 System.out.println(Thread.currentThread().getName() + "正在被執行,列印的值是:" + index); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }View Code
④newScheduledThreadPool(int corePoolSize):建立一個定長執行緒池,支援定時及週期性任務執行。
適用於執行延時或者週期性任務。
定長執行緒池:
- 支援定時及週期性任務執行。
建立方法:
//nThreads => 最大執行緒數即maximumPoolSize ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
原始碼:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //ScheduledThreadPoolExecutor(): public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
示例程式碼:
public class NewScheduledThreadPoolTest { public static void main(String[] args) { //建立一個定長執行緒池,支援定時及週期性任務執行——延遲執行 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); //延遲1秒執行 /*scheduledThreadPool.schedule(new Runnable() { public void run() { System.out.println("延遲1秒執行"); } }, 1, TimeUnit.SECONDS);*/ //延遲1秒後每3秒執行一次 scheduledThreadPool.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println("延遲1秒後每3秒執行一次"); } }, 1, 3, TimeUnit.SECONDS); } }View Code
10-3. ThreadPoolExecutor(重要)
在阿里巴巴開發手冊中,明確規定執行緒池不允許使用 Executors 去建立,所以我們需要使用 ThreadPoolExecutor 的方式建立執行緒池。
● ThreadPoolExecutor提供了四個建構函式:
// 五個構造引數
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } // 六個構造引數 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } // 六個構造引數 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } // 七個構造引數,本質的ThreadPoolExecutor() public ThreadPoolExecutor(int corePoolSize, // 核心執行緒池大小 int maximumPoolSize,// 最大核心執行緒池大小 long keepAliveTime, // 超時了沒有人呼叫就會釋放 TimeUnit unit, // 超時單位 BlockingQueue<Runnable> workQueue, // 阻塞佇列 ThreadFactory threadFactory, // 執行緒工廠:建立執行緒的,一般不用動 RejectedExecutionHandler handler){ // 拒絕策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
● 執行緒池的主要引數:
① int corePoolSize:該執行緒池中核心執行緒數最大值
執行緒池新建執行緒的時候,如果當前執行緒總數小於corePoolSize,則新建的是核心執行緒,如果超過corePoolSize,則新建的是非核心執行緒
核心執行緒預設情況下會一直存活線上程池中,即使這個核心執行緒啥也不幹(閒置狀態)。
如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那麼核心執行緒如果不幹活(閒置狀態)的話,超過一定時間(時長下面引數決定),就會被銷燬掉
② int maximumPoolSize:該執行緒池中執行緒總數最大值
執行緒總數 = 核心執行緒數 + 非核心執行緒數。
③ long keepAliveTime:該執行緒池中非核心執行緒閒置超時時長
當執行緒池中執行緒數大於核心執行緒數時,執行緒的空閒時間如果超過執行緒存活時間,那麼這個執行緒就會被銷燬,直到執行緒池中的執行緒數小於等於核心執行緒數。
如果設定allowCoreThreadTimeOut = true,則會作用於核心執行緒
④ TimeUnit uni:keepAliveTime的超時單位,TimeUnit是一個列舉型別
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小時
DAYS : 天
⑤BlockingQueue<Runnable> workQueue:阻塞佇列
該執行緒池中的任務佇列:維護著等待執行的Runnable物件
當所有的核心執行緒都在幹活時,新新增的任務會被新增到這個佇列中等待處理,如果佇列滿了,則新建非核心執行緒執行任務
⑥ ThreadFactory threadFactory:建立執行緒的方式(一般用不上)
這是一個介面,你new他的時候需要實現他的 Thread newThread(Runnable r)方法
⑦ RejectedExecutionHandler handler:拒絕策略,執行緒池和佇列都滿了,再加入執行緒會執行此策略。
這玩意兒就是丟擲異常專用的,比如上面提到的兩個錯誤發生了,就會由這個handler丟擲異常,你不指定他也有個預設的
○ 程式碼演示
public class Demo01 { public static void main(String[] args) { ExecutorService threadExecutor = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() // 丟棄任務,並丟擲RejectedExecutionException異常 【 預設 】 ); try { // 最大承載:maximumPoolSize + workQueue,超過則丟擲 RejectedExecutionException 異常 for (int i = 0; i < 9; i++) { // 使用執行緒池建立執行緒 threadExecutor.execute(() -> { System.out.println(Thread.currentThread().getName() + " => OK"); }); } } finally { // 執行緒池用完,程式結束,關閉執行緒池 threadExecutor.shutdown(); } } }
10-4.執行緒池的四種拒絕策略
① ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
這是執行緒池預設的拒絕策略,在任務不能再提交的時候,丟擲異常,及時反饋程式執行狀態。
如果是比較關鍵的業務,推薦使用此拒絕策略,這樣子在系統不能承載更大的併發量的時候,能夠及時的通過異常發現。
② ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
如果任務被拒絕了,則由呼叫執行緒(提交任務的執行緒)直接執行此任務。
③ ThreadPoolExecutor.DiscardPolicy:丟棄任務,但是不丟擲異常。如果執行緒佇列已滿,則後續提交的任務都會被丟棄,且是靜默丟棄。
使用此策略,可能會使我們無法發現系統的異常狀態。
建議是一些無關緊要的業務採用此策略。
例如,本人的部落格網站統計閱讀量就是採用的這種拒絕策略。
④ ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新提交被拒絕的任務。
此拒絕策略,是一種喜新厭舊的拒絕策略。
是否要採用此種拒絕策略,還得根據實際業務是否允許丟棄老任務來認真衡量。
10-5. 如何配置執行緒池
● CPU密集型任務
儘量使用較小的執行緒池,一般為CPU核心數+1。 因為CPU密集型任務使得CPU使用率很高,若開過多的執行緒數,會造成CPU過度切換。
● IO密集型任務
可以使用稍大的執行緒池,一般為2*CPU核心數。 IO密集型任務CPU使用率並不高,因此可以讓CPU在等待IO的時候有其他執行緒去處理別的任務,充分利用CPU時間。
● 混合型任務
可以將任務分成IO密集型和CPU密集型任務,然後分別用不同的執行緒池去處理。 只要分完之後兩個任務的執行時間相差不大,那麼就會比序列執行來的高效。
因為如果劃分之後兩個任務執行時間有資料級的差距,那麼拆分沒有意義。
因為先執行完的任務就要等後執行完的任務,最終的時間仍然取決於後執行完的任務,而且還要加上任務拆分與合併的開銷,得不償失
11. 非同步函數語言程式設計 - CompletableFuture
使用 FutureFuture獲得非同步執行結果時,要麼呼叫阻塞方法 get() ,要麼輪詢看 isDone()是否為 true,這兩種方法都不是很好,因為主執行緒也會被迫等待。
從 Java 8 開始引入了 CompletableFuture,它針對 Future做了改進,可以傳入回撥物件,當非同步任務完成或者發生異常時,自動呼叫回撥物件的回撥方法。
11-1. 建立 CompletableFuture 物件
CompletableFuture 提供了四個靜態方法用來建立 CompletableFuture 物件
Asynsc 表示非同步,而 supplyAsync與 runAsync不同在與前者非同步返回一個結果,後者是 void 。第二個函式第二個引數表示是用我們自己建立的執行緒池,否則採用預設的 ForkJoinPool.commonPool()作為它的執行緒池.其中 Supplier是一個函式式介面,代表是一個生成者的意思,傳入 0 個引數,返回一個結果。- public static CompletableFuture<Void> runAsync(Runnable runnable)
- public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{ return "hello world"; }); System.out.println(future.get()); //阻塞的獲取結果 ''helllo world"
11-2. Demo
public class Demo01 { public static void main(String[] args) throws ExecutionException, InterruptedException { // 沒有返回值的 runAsync 非同步回撥 // runAsync(); // 有返回值的 supplyAsync 非同步回撥 supplyAsync(); } // 沒有返回值的 runAsync 非同步回撥 public static void runAsync() throws ExecutionException, InterruptedException {
// 建立非同步執行任務 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " runAsync => Void"); }); System.out.println("11111111"); // 獲取阻塞執行結果 future.get(); } // 有返回值的 supplyAsync 非同步回撥 public static void supplyAsync() throws ExecutionException, InterruptedException { // 建立非同步執行任務: CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " runAsync => Void"); // int i = 10 / 0; // 異常呼叫失敗回撥 return 2048; }); // 成功和失敗的回撥 Integer result = supplyAsync // 如果執行成功: .whenComplete((t, u) -> { System.out.println("t => " + t); // 正常的返回結果 System.out.println("u => " + u); // 錯誤描述資訊:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero }) // 如果執行異常: .exceptionally((e) -> { System.out.println(e.getMessage()); return 404; // 錯誤的返回結果 }) // 獲取執行結果 .get(); System.out.println("result => " + result); } }
12深入單例模式
單例模式(Singleton Pattern)是 Java 中最簡單的設計模式之一。這種型別的設計模式屬於建立型模式,它提供了一種建立物件的最佳方式。
這種模式涉及到一個單一的類,該類負責建立自己的物件,同時確保只有單個物件被建立。這個類提供了一種訪問其唯一的物件的方式,可以直接訪問,不需要例項化該類的物件。
注意:
- 1、單例類只能有一個例項。
- 2、單例類必須自己建立自己的唯一例項。
- 3、單例類必須給所有其他物件提供這一例項。
多種單例的特性:
單例模式 | 是否推薦 | 懶載入 | 反序列化單例 | 反射單例 | 克隆單例 | 效能、失效問題 |
餓漢式 | Eager載入推薦 | x | x | x | x | 類載入時就初始化,浪費記憶體。 |
懶漢式(同步方法) | x | √ | x | x |
x |
存在效能問題,每次獲取示例都會進行同步 |
雙重檢測鎖(DCL) | 可用 | √ | x | x | x |
JDK < 1.5 失效 |
靜態內部類 | 推薦 | √ | x | x | x | |
列舉 | 最推薦 | x | √ | √ | √ |
JDK < 1.5 不支援 自動支援序列化機制,絕對防止多次例項化 |
12-1. 單例模式的幾種實現方式
① 餓漢式:該模式在類被載入時就會例項化一個物件。
該模式能簡單快速的建立一個單例物件,而且是執行緒安全的(只在類載入時才會初始化,以後都不會)。但它有一個缺點,就是不管你要不要都會直接建立一個物件,會消耗一定的效能(當然很小很小,幾乎可以忽略不計,所以這種模式在很多場合十分常用而且十分簡單)
// 餓漢式單例 public class Hungry { // 在類裝載時就例項化 private static Hungry HUNGRY = new Hungry(); // 私有化構造方法 private Hungry() { } // 提供方法讓外部獲取例項 public static Hungry getInstance() { return HUNGRY; } }
這種做法很方便的幫我們解決了多執行緒例項化的問題,但是缺點也很明顯。
因為這句程式碼 private static Hungry HUNGRY = new Hungry(); 的關係,所以該類一旦被jvm載入就會馬上例項化!
那如果我們不想用這個類怎麼辦呢? 是不是就浪費了呢?既然這樣,我們來看下替代方案! 懶漢式。
② 懶漢式:該模式只在你需要物件時才會生成單例物件(比如呼叫getInstance方法)
這種方式具備很好的 lazy loading,能夠在多執行緒中很好的工作,但是,效率很低,99% 情況下不需要同步。
public class LazyMan { private static LazyMan LAZY_MAN; private LazyMan() { } public synchronized static LazyMan getInstance() { if (LAZY_MAN== null) return new LazyMan(); return LAZY_MAN; } }
從執行緒安全性上講,不加同步的懶漢式是執行緒不安全的,比如說:有兩個執行緒,一個是執行緒A,一個是執行緒B,它們同時呼叫getInstance方法,那就可能導致併發問題。
所以只要加上 Synchronized 即可,但是這樣一來,會降低整個訪問的速度,而且每次都要判斷,也確實是稍微慢點。
那麼有沒有更好的方式來實現呢?雙重檢查加鎖,可以使用“雙重檢查加鎖”的方式來實現,就可以既實現執行緒安全,又能夠使效能不受到大的影響。
③雙檢鎖/雙重校驗鎖(DCL,即 double-checked locking):這種方式採用雙鎖機制,安全且在多執行緒情況下能保持高效能。
雙重檢查加鎖機制並不是每次進入getInstance方法都需要同步,而是先不同步,進入方法過後,先檢查例項是否存在,如果不存在才進入下面的同步塊,這是第一重檢查。進入同步塊過後,再次檢查例項是否存在,如果不存在,就在同步的情況下建立一個例項,這是第二重檢查。這樣一來,就只需要同步一次了,從而減少了多次在同步情況下進行判斷所浪費的時間。
public class DoubleCheck { private volatile static DoubleCheck DOUBLE_CHECK; private DoubleCheck() { } public static DoubleCheck getInstance() { // 先檢查例項是否存在,如果不存在才進入下面同步塊 if (DOUBLE_CHECK == null) {
// 同步塊,執行緒安全的建立例項 synchronized (DoubleCheck.class) { // 再次檢查例項是否存在,如果不存在則建立例項
if (DOUBLE_CHECK == null) return new DoubleCheck(); } } return DOUBLE_CHECK; } }
volatile關鍵字:將不會被本地執行緒快取,所有對該變數的讀寫都是直接操作共享記憶體,從而確保多個執行緒能正確的處理該變數。
注意:在Java1.4及以前版本中,很多JVM對於volatile關鍵字的實現有問題,會導致雙重檢查加鎖的失敗,因此雙重檢查加鎖的機制只能用在Java5及以上的版本。
這種實現方式既可使實現執行緒安全的建立例項,又不會對效能造成太大的影響,它 只在第一次建立例項的時候同步,以後就不需要同步了,從而加快執行速度。
但由於 volatile 關鍵字可能會遮蔽掉虛擬機器中一些必要的程式碼優化,所以執行效率並不是很高 ,因此一般建議,沒有特別的需要,不要使用。
也就是說,雖然可以使用雙重加鎖機制來實現執行緒安全的單例,但並不建議大量採用,根據情況來選用吧。
④ 靜態內部類:採用類級內部類,在這個類級內部類裡面去建立物件例項,只要不使用到這個類級內部類,那就不會建立物件例項
這種方式能達到雙檢鎖方式一樣的功效,但實現更簡單。對靜態域使用延遲初始化,應使用這種方式而不是雙檢鎖方式。這種方式只適用於靜態域的情況,雙檢鎖方式可在例項域需要延遲初始化時使用。
當getInstance方法第一次被呼叫的時候,它第一次讀取SingletonHolder.instance,導致SingletonHolder類得到初始化;而這個類在裝載並被初始化的時候,會初始化它的靜態域,從而建立Singleton的例項,由於是靜態的域,因此只會被虛擬機器在裝載類的時候初始化一次,並由虛擬機器來保證它的執行緒安全性。
public class Holder { // 私有構造方法 private Holder() { } /** * 類級的內部類,也就是靜態的成員式內部類,該內部類的例項與外部類的例項 * 沒有繫結關係,而且只有被呼叫到才會裝載,從而實現了延遲載入 */ private static class InnerClass { // 靜態初始化器,由JVM來保證執行緒安全 private static final Holder HOLDER = new Holder(); } public static final Holder getInstance() { return Holder.getInstance(); } }
這個模式的優勢在於,getInstance 方法並沒有被同步,並且只是執行一個域的訪問,因此延遲初始化並沒有增加任何訪問成本。
⑤ 列舉:列舉實現單例是最為推薦的一種方法,因為它更簡潔並且就算通過序列化,反射等也沒辦法破壞單例性
- Java的列舉型別實質上是功能齊全的類,因此可以有自己的屬性和方法;
- Java列舉型別的基本思想:通過公有的靜態final域為每個列舉常量匯出例項的類;
- 從某個角度講,列舉是單例的泛型化,本質上是單元素的列舉;
這種方式是 Effective Java 作者 Josh Bloch 提倡的方式,它不僅能避免多執行緒同步問題,而且還自動支援序列化機制,防止反序列化重新建立新的物件,絕對防止多次例項化。不過,由於 JDK1.5 之後才加入 enum 特性,用這種方式寫不免讓人感覺生疏,在實際工作中,也很少用。
不能通過 reflection attack 來呼叫私有構造方法。
public enum EnumSingle { INSTANCE; public EnumSingle getInstance() { return INSTANCE; } }
使用列舉來實現單例項控制,會更加簡潔,而且無償的提供了序列化的機制,並由JVM從根本上提供保障,絕對防止多次例項化,是更簡潔、高效、安全的實現單例的方式。
13. 公平鎖、非公平鎖、重入鎖、自旋鎖
13-1. 公平鎖、非公平鎖
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
1)是什麼
公平鎖就是先來後到、非公平鎖就是允許加塞, Lock lock = new ReentrantLock(Boolean fair); 預設非公平。
-
公平鎖:多個執行緒按照申請鎖的順序來獲取鎖,類似排隊打飯。
-
非公平鎖:多個執行緒獲取鎖的順序並不是按照申請鎖的順序,有可能後申請的執行緒優先獲取鎖,在高併發的情況下,有可能會造成優先順序反轉或者節現象。
2)兩者區別
-
公平鎖:Threads acquire a fair lock in the order in which they requested it
公平鎖,就是很公平,在併發環境中,每個執行緒在獲取鎖時,會先檢視此鎖維護的等待佇列,如果為空,或者當前執行緒就是等待佇列的第一個,就佔有鎖,否則就會加入到等待佇列中,以後會按照FIFO的規則從佇列中取到自己。
-
非公平鎖:a nonfair lock permits barging: threads requesting a lock can jump ahead of the queue of waiting threads if the lock happens to be available when it is requested.
非公平鎖比較粗魯,上來就直接嘗試佔有額,如果嘗試失敗,就再採用類似公平鎖那種方式。
3)other
對 Java ReentrantLock 而言,通過建構函式指定該鎖是否公平,預設是非公平鎖,非公平鎖的優點在於吞吐量比公平鎖大。
對 Synchronized 而言,是一種非公平鎖。
15-2. 重入鎖(遞迴鎖)
1)遞迴鎖是什麼
指的時同一執行緒外層函式獲得鎖之後,內層遞迴函式仍然能獲取該鎖的程式碼,在同一個執行緒在外層方法獲取鎖的時候,在進入內層方法會自動獲取鎖,也就是說,執行緒可以進入任何一個它已經擁有的鎖所同步著的程式碼塊
2)ReentrantLock / Synchronized 就是一個典型的可重入鎖
3)可重入鎖最大的作用是避免死鎖
4)程式碼示例
● synchronized
public class Demo01 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(() -> { phone.sms(); }, "A").start(); new Thread(() -> { phone.call(); }, "B").start(); } } class Phone { // 鎖 1 public synchronized void sms() { System.out.println(Thread.currentThread().getName() + " => sms"); try { // TimeUnit.SECONDS.sleep(3); // 執行緒睡眠,鎖不釋放 // wait(); // 執行緒等待,釋放鎖 } catch (Exception e) { e.printStackTrace(); } call(); // 鎖2 - 這裡也有鎖 } // 鎖 2 public synchronized void call() { System.out.println(Thread.currentThread().getName() + " => call"); // notify(); // 喚醒 A } }
● ReentrantLock
public class Demo02 { public static void main(String[] args) { Phone2 phone2 = new Phone2(); new Thread(() -> { phone2.sms(); }, "A").start(); new Thread(() -> { phone2.call(); }, "B").start(); } } class Phone2 { Lock lock = new ReentrantLock(); public void sms() { lock.lock(); //