Java 多執行緒併發程式設計
導讀
創作不易,禁止轉載!
併發程式設計簡介
發展歷程
早起計算機,從頭到尾執行一個程式,這樣就嚴重造成資源的浪費。然後作業系統就出現了,計算機能執行多個程式,不同的程式在不同的單獨的程序中執行,一個程序,有多個執行緒,提高資源的利用率。ok,如果以上你還不瞭解的話,我這裡有2個腦補連結(點我直達1、點我直達2)
簡介(百度百科)
所謂併發程式設計是指在一臺處理器上“同時”處理多個任務。併發是在同一實體上的多個事件。多個事件在同一時間間隔發生。
目標(百度百科)
併發程式設計的目標是充分的利用處理器的每一個核,以達到最高的處理效能。
序列與並行的區別
可能這個栗子不是很恰當,仁者見仁智者見智。智者get到點,愚者咬文爵字,啊!你這個栗子不行,不切合實際,巴拉巴拉 .....為啥加起來是2小時6分鐘,吃飯不要時間麼(洗衣服:把要洗的衣服塞到洗衣機,包括倒洗衣液等等3分鐘;做飯:同理),你大爺的,吃飯的時候不能看電影嘛。好了,請出門右轉,這裡不歡迎槓精,走之前把門關上!!!通過這個栗子,可以看出做相同的事情,所花費的時間不同(這就是為啥工作中,每個人的工作效率有高低了叭)。
什麼時候適合併發程式設計
- 任務阻塞執行緒,導致之後的程式碼不能執行:一邊從檔案中讀取,一邊進行大量計算
- 任務執行時間過長,可以瓜分為分工明確的子任務:分段下載檔案
- 任務間斷性執行:日誌列印
- 任務協作執行:生產者消費者問題
併發程式設計中的上下文切換
以下內容,百度百科原話(點我直達)。
上下文切換指的是核心(作業系統的核心)在CPU上對程序或者執行緒進行切換。上下文切換過程中的資訊被儲存在程序控制塊(PCB-Process Control Block)中。PCB又被稱作切換楨(SwitchFrame)。上下文切換的資訊會一直被儲存在CPU的記憶體中,直到被再次使用。
最重要的一句話:上下文頻繁的切換,會帶來一定的效能開銷。
減少上下文切換開銷方法
- 無鎖併發程式設計
- 多執行緒競爭鎖時,會引起上下文切換,所以多個執行緒處理資料時,可以用一些辦法來避免使用鎖,如將資料的ID按照Hash演算法取模分段,不同的執行緒處理不同段的資料
- CAS
- Java的Atomic包使用CAS演算法來更新資料,而不需要加鎖
- 控制執行緒數
- 避免建立過多不需要的執行緒,當任務少的時候,但是建立很多執行緒來處理,這樣會造成大量執行緒都處於等待狀態
協程(GO語言)
- 在單執行緒裡實現多工的排程,並在單執行緒裡維持多個任務間的切換。
知乎上,有個人寫的不錯,推薦給大家:點我直達
死鎖(程式碼演示)
第一次執行,沒有發生死鎖,第二次執行時,先讓執行緒A睡眠50毫秒,程式一直卡著不動,發生死鎖。你不讓我,我不讓你,爭奪YB_B的資源。
檢視死鎖(在重要不過啦)(jdk提供的一些工具)
- 命令列工具:jps
- 檢視堆疊:jstack pid
- 視覺化工具:jconsole
jps&jstack
分析
jconsole
控制檯輸入:jconsole,然後按照gif,看執行緒->檢測死鎖
程式碼拷貝區
package com.yb.thread; /** * @ClassName:DeadLockDemo * @Description:死鎖程式碼演示 * @Author:chenyb * @Date:2020/9/7 10:23 下午 * @Versiion:1.0 */ public class DeadLockDemo { private static final Object YB_A=new Object(); private static final Object YB_B=new Object(); public static void main(String[] args) { new Thread(()->{ synchronized (YB_A){ try { //讓執行緒睡眠50毫秒 Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (YB_B){ System.out.println("執行緒-AAAAAAAAAAAAA"); } } }).start(); new Thread(()->{ synchronized (YB_B){ synchronized (YB_A){ System.out.println("執行緒-BBBBBBBBBBBBB"); } } }).start(); } }
執行緒基礎
程序與執行緒的區別
程序:是系統進行分配和管理資源的基本單位
執行緒:程序的一個執行單元,是程序內排程的實體、是CPU排程和分派的基本單位,是比程序更小的獨立執行的基本單位。執行緒也被稱為輕量級程序,執行緒是程式執行的最小單位。
一個程式至少一個程序,一個程序至少一個執行緒。
執行緒的狀態(列舉)
- 初始化(NEW)
- 新建了一個執行緒物件,但還沒有呼叫start()方法
- 執行(RUNNABLE)
- 處於可執行狀態的執行緒正在JVM中執行,但他可能正在等待來自作業系統的其他資源
- 阻塞(BLOCKED)
- 執行緒阻塞與synchronized鎖,等待獲取synchronized鎖的狀態
- 等待(WAITING)
- Object.wait()、join()、LockSupport.part(),進入該狀態的執行緒需要等待其他執行緒做出一些特定動作(通知|中斷)
- 超時等待(TIME_WAITING)
- Object.wait(long)、Thread.join()、LockSupport.parkNanos()、LockSupport.parkUntil,該狀態不同於WAITING
- 終止(TERMINATED)
- 該執行緒已經執行完畢
建立執行緒
方式一
方式二(推薦)
好處
- java只能單繼承,但是介面可以繼承多個
- 增加程式的健壯性,程式碼可以共享
注意事項
方式三(匿名內部類)
方式四(Lambada)
方式五(執行緒池)
注意:程式還未關閉!!!!
執行緒的掛起與恢復
方式一(不推薦)
不推薦使用,會造成死鎖~
方式二(推薦)
wait():暫停執行,放棄已獲得的鎖,進入等待狀態
notify():隨機喚醒一個在等待鎖的執行緒
notifyAll():喚醒所有在等待鎖的執行緒,自行搶佔CPU資源
執行緒的中斷
方式一(不推薦)
注意:使用stop()可以中斷執行緒,但是會帶來執行緒不安全問題(stop被呼叫,執行緒立刻停止),理論上numA和numB都是1,結果numB=0;還是沒搞明白的,給你個眼神,自己體會~
方式二(推薦)
方式三(更推薦)
執行緒優先順序
執行緒的優先順序告訴程式該執行緒的重要程度有多大。如果有大量執行緒都被阻塞,都在等候執行,程式會盡可能地先執行優先順序的那個執行緒。但是,這並不表示優先順序較低的執行緒不會執行。若執行緒的優先順序較低,只不過表示它被准許的機會小一些而已。
執行緒的優先順序
- 最小=1
- 最大=10
- 預設=5
驗證
可以看出,列印執行緒2的機率比較大,因為執行緒優先順序高。執行緒優先順序,推薦使用(不同平臺對執行緒的優先順序支援不同):1、5、10
守護執行緒(不建議使用)
任何一個守護執行緒都是整個程式中所有使用者執行緒的守護者,只要有活著的使用者執行緒,守護執行緒就活著。
執行緒安全性
synchronized
點我直達
修改普通方法:鎖住物件的例項
修飾靜態方法:鎖住整個類
修改程式碼塊:鎖住一個物件synchronized (lock)
volatile
僅能修飾變數,保證該物件的可見性(多執行緒共享的變數),不保證原子性。
用途
- 執行緒開關
- 單例修改物件的例項
鎖
lock的使用
lock與synchronized區別
lock:需要手動設定加鎖和釋放鎖
synchronized:託管給jvm執行
檢視lock的實現類有哪些
多執行緒下除錯
注意看圖,執行緒1、2、3的狀態:Runnable|wailting,還沒get到點的話,你真的要反思一下了
讀寫鎖
讀寫互斥、寫寫互斥、讀讀不互斥
如果要想debug除錯檢視效果,可開2個執行緒,一個自增,一個輸出
package com.yb.thread.lock; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @ClassName:ReentrantReadWriteLockDemo * @Description:讀寫鎖 * @Author:chenyb * @Date:2020/9/26 3:14 下午 * @Versiion:1.0 */ public class ReentrantReadWriteLockDemo { private int num_1 = 0; private int num_2 = 0; private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //讀鎖 private Lock readLock = lock.readLock(); //寫鎖 private Lock writeLock = lock.writeLock(); public void out() { readLock.lock(); try { System.out.println(Thread.currentThread().getName() + "num1====>" + num_1 + ";num_2======>" + num_2); } finally { readLock.unlock(); } } public void inCreate() { writeLock.lock(); try { num_1++; try { Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } num_2++; } finally { writeLock.unlock(); } } public static void main(String[] args) { ReentrantReadWriteLockDemo rd = new ReentrantReadWriteLockDemo(); // for(int x=0;x<3;x++){ // new Thread(()->{ // rd.inCreate(); // rd.out(); // }).start(); // } //=========讀寫互斥 new Thread(() -> { rd.inCreate(); }, "寫").start(); new Thread(() -> { rd.out(); }, "讀").start(); //========寫寫互斥 new Thread(() -> { rd.inCreate(); }, "寫1").start(); new Thread(() -> { rd.inCreate(); }, "寫2").start(); //==========讀讀不互斥 new Thread(() -> { rd.out(); }, "讀1").start(); new Thread(() -> { rd.out(); }, "讀2").start(); } }
鎖降級
寫執行緒獲取寫鎖後可以獲取讀鎖,然後釋放寫鎖,這樣寫鎖變成了讀鎖,從而實現鎖降級。
注:鎖降級之後,寫鎖不會直接降級成讀鎖,不會隨著讀鎖的釋放而釋放,因此要顯示地釋放寫鎖。
用途
用於對資料比較敏感,需要在對資料修改之後,獲取到修改後的值,並進行接下來的其他操作。理論上已經會輸入依據:“num=1”,實際多執行緒下沒輸出,此時可以用鎖降級解決。給你個眼神,自己體會
package com.yb.thread.lock; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @ClassName:LockDegradeDemo * @Description:鎖降級demo * @Author:chenyb * @Date:2020/9/26 10:53 下午 * @Versiion:1.0 */ public class LockDegradeDemo { private int num = 0; //讀寫鎖 private ReentrantReadWriteLock readWriteLOck = new ReentrantReadWriteLock(); Lock readLock = readWriteLOck.readLock(); Lock writeLock = readWriteLOck.writeLock(); public void doSomething() { //寫鎖 writeLock.lock(); //讀鎖 readLock.lock(); try { num++; } finally { //釋放寫鎖 writeLock.unlock(); } //模擬其他複雜操作 try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } try { if (num == 1) { System.out.println("num=" + num); } else { System.out.println(num); } } finally { //釋放度鎖 readLock.unlock(); } } public static void main(String[] args) { LockDegradeDemo ld = new LockDegradeDemo(); for (int i = 0; i < 4; i++) { new Thread(() -> { ld.doSomething(); }).start(); } } }
鎖升級?
注:從圖可以看出,執行緒卡著,驗證不存在先讀後寫,從而不存在鎖升級這種說法
StampedLock鎖
簡介
一般應用,都是讀多寫少,ReentrantReadWriteLock,因為讀寫互斥,所以讀時阻塞寫,效能提不上去。可能會使寫執行緒飢餓
特點
- 不可重入:一個執行緒已經持有寫鎖,再去獲取寫鎖的話,就會造成死鎖
- 支援鎖升級、降級
- 可以樂觀讀也可以悲觀讀
- 使用有限次自旋,增加鎖獲得的機率,避免上下文切換帶來的開銷,樂觀讀不阻塞寫操作,悲觀讀,阻塞寫
優點
相比於ReentrantReadWriteLock,吞吐量大幅提升
缺點
- api複雜,容易用錯
- 實現原理相比於ReentrantReadWriteLock複雜的多
demo
package com.yb.thread.lock; import java.util.concurrent.locks.StampedLock; /** * @ClassName:StampedLockDemo * @Description:官方例子 * @Author:chenyb * @Date:2020/9/26 11:37 下午 * @Versiion:1.0 */ public class StampedLockDemo { //成員變數 private double x, y; //鎖例項 private final StampedLock sl = new StampedLock(); //排它鎖-寫鎖(writeLock) void move(double deltaX, double deltaY) { long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } //樂觀讀鎖 double distanceFromOrigin() { //嘗試獲取樂觀鎖1 long stam = sl.tryOptimisticRead(); //將全部變數拷貝到方法體棧內2 double currentX = x, currentY = y; //檢查在1獲取到讀鎖票據後,鎖有沒被其他寫執行緒排他性搶佔3 if (!sl.validate(stam)) { //如果被搶佔則獲取一個共享讀鎖(悲觀獲取)4 stam = sl.readLock(); try { //將全部變數拷貝到方法體棧內5 currentX = x; currentY = y; } finally { //釋放共享讀鎖6 sl.unlockRead(stam); } } //返回計算結果7 return Math.sqrt(currentX * currentX + currentY * currentY); } //使用悲觀鎖獲取讀鎖,並嘗試轉換為寫鎖 void moveIfAtOrigin(double newX, double newY) { //這裡可以使用樂觀讀鎖替換1 long stamp = sl.readLock(); try { //如果當前點遠點則移動2 while (x == 0.0 && y == 0.0) { //嘗試將獲取的讀鎖升級為寫鎖3 long ws = sl.tryConvertToWriteLock(stamp); //升級成功後,則更新票據,並設定座標值,然後退出迴圈4 if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { //讀鎖升級寫鎖失敗則釋放讀鎖,顯示獲取獨佔寫鎖,然後迴圈重試5 sl.unlockRead(stamp); stamp = sl.writeLock(); } } } finally { //釋放鎖6 sl.unlock(stamp); } } }
生產者消費者模型
Consumer.java
package com.yb.thread.communication; /** * 消費者 */ public class Consumer implements Runnable { private Medium medium; public Consumer(Medium medium) { this.medium = medium; } @Override public void run() { while (true) { medium.take(); } } }
Producer.java
package com.yb.thread.communication; /** * 生產者 */ public class Producer implements Runnable { private Medium medium; public Producer(Medium medium) { this.medium = medium; } @Override public void run() { while (true) { medium.put(); } } }
Medium.java
package com.yb.thread.communication; /** * 中間商 */ public class Medium { //生產個數 private int num = 0; //最多生產數 private static final int TOTAL = 20; /** * 接受生產資料 */ public synchronized void put() { //判斷當前庫存,是否最大庫存容量 //如果不是,生產完成之後,通知消費者消費 //如果是,通知生產者進行等待 if (num < TOTAL) { System.out.println("新增庫存--------當前庫存" + ++num); //喚醒所有執行緒 notifyAll(); } else { try { System.out.println("新增庫存-----庫存已滿" + num); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 獲取消費資料 */ public synchronized void take() { //判斷當前庫存是否不足 //如果充足,在消費完成之後,通知生產者進行生產 //如果不足,通知消費者暫停消費 if (num > 0) { System.out.println("消費庫存-------當前庫存容量" + --num); //喚醒所有執行緒 notifyAll(); } else { System.out.println("消費庫存--------庫存不足" + num); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
測試
管道流通訊
以記憶體為媒介,用於執行緒之間的資料傳輸
面向位元組:PipedOutputStream、PipedInputStream
面向字元:PipedReader、PipedWriter
Reader.java
package com.yb.thread.communication.demo; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PipedInputStream; import java.util.stream.Collectors; /** * @ClassName:Reader * @Description:TODO * @Author:chenyb * @Date:2020/9/27 10:22 下午 * @Versiion:1.0 */ public class Reader implements Runnable{ private PipedInputStream pipedInputStream; public Reader(PipedInputStream pipedInputStream){ this.pipedInputStream=pipedInputStream; } @Override public void run() { if (pipedInputStream!=null){ String collect = new BufferedReader(new InputStreamReader(pipedInputStream)).lines().collect(Collectors.joining("\n")); System.out.println(collect); } //關閉流 try { pipedInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } }
Main.java
package com.yb.thread.communication.demo; import java.io.*; /** * @ClassName:Main * @Description:TODO * @Author:chenyb * @Date:2020/9/27 10:22 下午 * @Versiion:1.0 */ public class Main { public static void main(String[] args) { PipedInputStream pipedInputStream = new PipedInputStream(); PipedOutputStream pipedOutputStream = new PipedOutputStream(); try { pipedOutputStream.connect(pipedInputStream); } catch (IOException e) { e.printStackTrace(); } new Thread(new Reader(pipedInputStream)).start(); BufferedReader bufferedReader = null; try { bufferedReader = new BufferedReader(new InputStreamReader(System.in)); pipedOutputStream.write(bufferedReader.readLine().getBytes()); } catch (IOException e) { e.printStackTrace(); } finally { try { pipedOutputStream.close(); if (bufferedReader!=null){ bufferedReader.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
測試
Thread.join
執行緒A執行一半,需要資料,這個資料需要執行緒B去執行修改,B修改完成後,A才繼續操作
演示
ThreadLocal
執行緒變數,是一個以ThreadLocal物件為鍵、任意物件為值的儲存結構。
1、ThreadLocal.get: 獲取ThreadLocal中當前執行緒共享變數的值。
2、ThreadLocal.set: 設定ThreadLocal中當前執行緒共享變數的值。
3、ThreadLocal.remove: 移除ThreadLocal中當前執行緒共享變數的值。
4、ThreadLocal.initialValue: ThreadLocal沒有被當前執行緒賦值時或當前執行緒剛呼叫remove方法後呼叫get方法,返回此方法值。
原子類
概念
對多執行緒訪問同一個變數,我們需要加鎖,而鎖是比較消耗效能的,JDK1.5之後,新增的原子操作類提供了一種用法簡單、效能高效、執行緒安全地更新一個變數的方式,這些類同樣位於JUC包下的atomic包下,發展到JDK1.8,該包下共有17個類,囊括了原子更新基本型別、原子更新陣列、原子更新屬性、原子更新引用。
1.8新增的原子類
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
- Striped64
原子更新基本型別
JDK1.8之前有以下幾個
- AtomicBoolean
- AtomicInteger
- AtomicLong
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
大致3類
- 元老級的原子更新,方法幾乎一模一樣:AtomicBoolean、AtomicInteger、AtomicLong
- 對Double、Long原子更新效能進行優化提升:DoubleAdder、LongAdder
- 支援自定義運算:DoubleAccumulator、LongAccumulator
演示
元老級
自定義運算
原子更新陣列
JDK1.8之前大概有以下幾個
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
原子更新屬性
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
- AtomicStampedReference
- AtomicReferenceFieldUpdater
原子更新引用
- AtomicReference:用於對引用的原子更新
- AtomicMarkableReference:帶版本戳的原子引用型別,版本戳為boolean型別
- AtomicStampedReference:帶版本戳的原子引用型別,版本戳為int型別
容器
同步容器
Vector、HashTable:JDK提供的同步容器類
Collections.SynchronizedXXX:對相應容器進行包裝
缺點
在單獨使用裡面的方法的時候,可以保證執行緒安全,但是,複合操作需要額外加鎖來保證執行緒安全,使用Iterator迭代容器或使用for-each遍歷容器,在迭代過程中修改容器會拋ConcurrentModificationException異常。想要避免出現這個異常,就必須在迭代過程持有容器的鎖。但是若容器較大,則迭代的時間也會較長。那麼需要訪問該容器的其他執行緒將會長時間等待。從而極大降低效能。
若不希望在迭代期間對容器加鎖,可以使用“克隆”容器的方式。使用執行緒封閉,由於其他執行緒不會對容器進行修改,可以避免ConcurrentModificationException。但是在建立副本的時候,存在較大效能開銷。toString、hashCode、equalse、containsAll、removeAll、retainAll等方法都會隱式的Iterate,也即可能丟擲ConcurrentModificationException。
package com.yb.thread.container; import java.util.Iterator; import java.util.Vector; /** * @ClassName:VectorDemo * @Description:TODO * @Author:chenyb * @Date:2020/9/29 9:35 下午 * @Versiion:1.0 */ public class VectorDemo { public static void main(String[] args) { Vector<String> strings = new Vector<>(); for (int i = 0; i <1000 ; i++) { strings.add("demo"+i); } //錯誤遍歷 // strings.forEach(e->{ // if (e.equals("demo3")){ // strings.remove(e); // } // System.out.println(e); // }); //正確迭代---->單執行緒 // Iterator<String> iterator = strings.iterator(); // while (iterator.hasNext()){ // String next = iterator.next(); // if (next.equals("demo3")){ // iterator.remove(); // } // System.out.println(next); // } //正確迭代--->多執行緒 Iterator<String> iterator = strings.iterator(); for (int i = 0; i < 4; i++) { new Thread(()->{ synchronized (iterator){ while (iterator.hasNext()){ String next = iterator.next(); if (next.equals("demo3")){ iterator.remove(); } } } }).start(); } } }
併發容器
CopyOnWrite、Concurrent、BlockingQueue:根據具體場景進行設計,儘量避免使用鎖,提高容器的併發訪問性。
ConcurrentBlockingQueue:基於queue實現的FIFO的佇列。佇列為空,去操作會被阻塞
ConcurrentLinkedQueue:佇列為空,取得時候就直接返回空
package com.yb.thread.container; import java.util.Iterator; import java.util.concurrent.CopyOnWriteArrayList; /** * @ClassName:Demo * @Description:TODO * @Author:chenyb * @Date:2020/9/29 9:50 下午 * @Versiion:1.0 */ public class Demo { public static void main(String[] args) { CopyOnWriteArrayList<String> strings=new CopyOnWriteArrayList<>(); for (int i = 0; i < 1000; i++) { strings.add("demo"+i); } //正常操作--->單執行緒 // strings.forEach(e->{ // if (e.equals("demo2")){ // strings.remove(e); // } // }); //錯誤操作,不支援迭代器移除元素,直接拋異常 // Iterator<String> iterator = strings.iterator(); // while (iterator.hasNext()){ // String next = iterator.next(); // if (next.equals("demo2")){ // iterator.remove(); // } // } //正常操作--->多執行緒 for (int i = 0; i < 4; i++) { new Thread(()->{ strings.forEach(e -> { if (e.equals("demo2")) { strings.remove(e); } }); }).start(); } } }
LinkedBlockingQueue
可以作為生產者消費者的中間商(使用put、take)。
package com.yb.thread.container; import java.util.concurrent.LinkedBlockingDeque; /** * @ClassName:Demo2 * @Description:TODO * @Author:chenyb * @Date:2020/9/29 10:05 下午 * @Versiion:1.0 */ public class Demo2 { public static void main(String[] args) { LinkedBlockingDeque<String> strings = new LinkedBlockingDeque<>(); //新增元素,3種方式 strings.add("陳彥斌"); //佇列滿的時候,會拋異常 strings.offer("陳彥斌"); //如果佇列滿了,直接入隊失敗 try { strings.put("陳彥斌"); //佇列滿,進入阻塞狀態 } catch (InterruptedException e) { e.printStackTrace(); } //從佇列中取元素,3種方式 String remove = strings.remove(); //會丟擲異常 strings.poll(); //在佇列為空的時候,直接返回null try { strings.take(); //佇列為空的時候,會進入等待狀態 } catch (InterruptedException e) { e.printStackTrace(); } } }
併發工具類
CountDownLatch
- await():進入等待狀態
- countDown:計算器減一
應用場景
- 啟動三個執行緒計算,需要對結果進行累加
package com.yb.thread.tool; import java.util.concurrent.CountDownLatch; /** * @ClassName:CountDownLatchDemo * @Description:TODO * @Author:chenyb * @Date:2020/9/29 10:26 下午 * @Versiion:1.0 */ public class CountDownLatchDemo { public static void main(String[] args) { //模擬場景,學校比較,800米,跑完之後,有跨欄 //需要先將800米跑完,在佈置跨欄,要不然跑800米的選手會被累死 CountDownLatch countDownLatch = new CountDownLatch(8); new Thread(()->{ try { countDownLatch.await(); System.out.println("800米比賽結束,準備清跑道,並進行跨欄比賽"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); for (int i = 0; i < 8; i++) { int finalI = i; new Thread(()->{ try { Thread.sleep(finalI *1000L); System.out.println(Thread.currentThread().getName()+",到達終點"); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); } }).start(); } } }
CyclicBarrier
允許一組執行緒相互等待達到一個公共的障礙點,之後繼續執行
區別
- CountDownLatch一般用於某個執行緒等待若干個其他執行緒執行完任務之後,他才執行:不可重複使用
- CyclicBarrier一般用於一組執行緒相互等待至某個狀態,然後這一組執行緒再同時執行:可重用
package com.yb.thread.tool; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * @ClassName:CyclicBarrierDemo * @Description:TODO * @Author:chenyb * @Date:2020/9/29 10:42 下午 * @Versiion:1.0 */ public class CyclicBarrierDemo { public static void main(String[] args) { //模擬場景:學校800米跑步,等到所有選手全部到齊後,一直跑 CyclicBarrier cyclicBarrier=new CyclicBarrier(8); for (int i = 0; i < 8; i++) { int finalI = i; new Thread(()->{ try { Thread.sleep(finalI *1000L); System.out.println(Thread.currentThread().getName()+",準備就緒"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("選手已到齊,開始比賽"); }).start(); } } }
Semaphore(訊號量)
控制執行緒併發數量
應用場景
- 介面限流
package com.yb.thread.tool; import java.util.concurrent.Semaphore; /** * @ClassName:SemaphoreDemo * @Description:TODO * @Author:chenyb * @Date:2020/9/29 11:11 下午 * @Versiion:1.0 */ public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(8); for (int i = 0; i < 20; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + ",開始執行"); Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } finally { //釋放 semaphore.release(); } }).start(); } } }
Exchange
它提供一個同步點,在這個同步點兩個執行緒可以交換彼此的資料(成對)。
應用場景
- 交換資料
package com.yb.thread.tool; import java.util.concurrent.Exchanger; /** * @ClassName:ExchangerDemo * @Description:TODO * @Author:chenyb * @Date:2020/9/29 11:21 下午 * @Versiion:1.0 */ public class ExchangerDemo { public static void main(String[] args) { Exchanger<String> stringExchanger=new Exchanger<>(); String str1="陳彥斌"; String str2="ybchen"; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"--------------初始值:"+str1); try { String exchange = stringExchanger.exchange(str1); System.out.println(Thread.currentThread().getName()+"--------------交換:"+exchange); } catch (InterruptedException e) { e.printStackTrace(); } },"執行緒A").start(); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"--------------初始值:"+str2); try { String exchange = stringExchanger.exchange(str2); System.out.println(Thread.currentThread().getName()+"--------------交換:"+exchange); } catch (InterruptedException e) { e.printStackTrace(); } },"執行緒B").start(); } }