Java 進階——併發程式設計之執行緒同步利器CountDownLatch、CyclicBarrier、Semaphore 的使用小結
引言
Java 語言之所以廣泛運用於服務端程式,很大一部分原因就是因為在JDK中Java 已經為我們提供了很多併發場景的解決方案,藉助這些系統方案我們可以快速應用於具體場景,甚至是在系統方案上進行擴充套件,這篇文章就好好總結下三種執行緒控制工具類。
一、CountDownLatch
1、CountDownLatch概述
CountDownLatch 是一種允許一個或多個執行緒阻塞等待,直到在其他執行緒中的一系列操作完成之後,再繼續執行的一次性同步機制(A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes)。簡單理解CountDownLatch(內部是基於AbstractQueuedSynchronizer,篇幅問題不在此篇文章分析範圍內)就是一個基於訊號量機制的阻塞和喚醒執行緒的工具類,通過呼叫CountDownLatch提供的api方法就可以達到靈活阻塞和喚醒執行緒
2、CountDownLatch的主要方法
方法名稱 | 說明 |
---|---|
public CountDownLatch(int count) | 通過指定訊號量值構造CountDownLatch |
public void await() |
呼叫await()方法的執行緒會被阻塞,直到count值為0才繼續執行 |
public boolean await(long timeout,TimeUnit unit) | 和await()類似,只不過等待一定的時間後count值還沒變為0的話就會繼續執行 |
public void countDown() | 將count值減1 |
3、CountDownLatch的簡單應用
眾所周知,即使我們幾乎同時創建出若干條子執行緒,且每個子執行緒內部執行的工作一樣,也不能保證每個子執行緒執行的順序和結束的時間,因為啟動過程和執行過程需要靠cpu 來排程,下面這個例子每次執行的結果也有所不同,但是基本上通過CountDownLatch都能保證主執行緒在每一組的所有子執行緒執行完畢之後再繼續執行,總之呼叫await方法時後且當前訊號量值不為0 呼叫執行緒則會被阻塞,而且一個執行緒可以被一個或者多個CountDownLatch控制。
- 通過CountDownLatch(int count)初始化建立CountDownLatch例項
- 根據具體場景在某個執行緒中呼叫await()或者await(long timeout,TimeUnit unit)方法嘗試阻塞該執行緒
- 線上程內部或者其他執行緒呼叫 countDown()對計數減一
public class CountDownLatchDemo {
private final static int GROUP_SIZE = 3;
public static void main(String[] args) throws InterruptedException {
processOneGroup(第一組);
}
/**
*
* @param name
* @throws InterruptedException
*/
private static void processOneGroup(final String name)
throws InterruptedException {
final CountDownLatch mainCountDown = new CountDownLatch(GROUP_SIZE);//用於控制主執行緒的CountDownLatch
final CountDownLatch startCountDown = new CountDownLatch(1);//
System.out.println("***首先"+Thread.currentThread().getName()+"執行緒執行:" + "通過迴圈建立若干工作執行緒" + "***");
for (int i = 0; i < GROUP_SIZE; i++) {
new Thread(String.valueOf(i)) {
public void run() {
System.out.println(name + "的" + getName() + "執行緒已經準備就緒");
try {
startCountDown.countDown();
startCountDown.await();//如果startCountDown初始訊號量值為1的話經過上一步的coutDown 當前訊號量值已經為0 了 即使呼叫了await也無法阻塞
System.out.println("先呼叫await方法阻塞"+getName()+ "執行緒");
System.out.println(getName()+ "執行緒處理自己的工作ing");
Thread.sleep(3000);
mainCountDown.countDown();//主執行緒計數減一
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子執行緒"+Thread.currentThread().getName() + "執行:"+name + "的" + getName() + "執行緒已執行完畢");
}
}.start();
}
mainCountDown.await();// 等待所有子執行緒執行完畢
System.out.println("執行緒"+Thread.currentThread().getName() + "執行:"+name + "中所有工作執行緒工作完畢");
//startCountDown.countDown();
System.out.println("----end執行緒"+Thread.currentThread().getName() + "繼續執行------");
}
二、CyclicBarrier
1、CyclicBarrier概述
CyclicBarrier首先從字面上理解是Cyclic Barrier 即迴圈屏障。CountDownLatch和CyclicBarrier都能夠實現執行緒之間的等待,只不過它們側重點不同:CountDownLatch一般用於某個執行緒A等待若干個其他執行緒執行完任務之後,它才執行;而CyclicBarrier一般用於一組執行緒互相等待至某個狀態,然後這一組執行緒再同時執行;而且CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的。簡而言之,CyclicBarrier可以讓一組執行緒阻塞(暫且把這個狀態就叫做barrier)直到barrier狀態之後再全部(全部指的是getParties 獲取到的所有等待執行緒)同時執行,當某個執行緒呼叫await()方法之後,該執行緒就處於barrier了。
2、CyclicBarrier的主要方法
方法名稱 | 說明 |
---|---|
public CyclicBarrier(int parties) | parties:必須要呼叫await方法的執行緒數, |
public CyclicBarrier(int parties, Runnable barrierAction) | barrierAction:clicBarrier到達 barrier狀態時候要執行才操作 |
public void await() | 呼叫await()方法的執行緒會被阻塞,直到CyclicBarrier上所有的執行緒(Waits until all getParties parties have invoked await on this barrier)都執行了await方法 |
public boolean await(long timeout,TimeUnit unit) | 和await()類似,讓這些執行緒等待至一定的時間,如果還有執行緒沒有到達barrier狀態,也直接讓到達barrier的執行緒執行後續任務。 |
public void reset() | 重用CyclicBarrier,將CyclicBarrier狀態置為初始態,若當前還有其他執行緒在阻塞則會丟擲BrokenBarrierException異常 |
3、CyclicBarrier的簡單應用
3.1、CyclicBarrier的基本應用
package concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
private static final int THREAD_COUNT=3;
/**
* 如果這裡parties 不等於THREAD_COUNT會出現問題,可能會一直卡住
*/
private final static CyclicBarrier CYCLIC_BARRIER=new CyclicBarrier(THREAD_COUNT, new Runnable() {
@Override
public void run() {
System.out.println("全部團員已經結束上次行程,導遊開始點名,準備進行下一個環節");
}
});
public static void main(String[] args) {
for(int i=0;i<3;i++){
new Thread(String.valueOf(i)){
public void run(){
try {
System.out.println("執行緒"+getName()+" 已到達旅遊地點!");
CYCLIC_BARRIER.await();//阻塞當前執行緒
System.out.println("執行緒"+getName()+" 開始騎車");
Thread.sleep(2000);
CYCLIC_BARRIER.await();
System.out.println("執行緒"+getName()+" 開始爬山");
Thread.sleep(3000);
CYCLIC_BARRIER.await();
System.out.println("執行緒"+getName()+" 回賓館休息");
Thread.sleep(1000);
CYCLIC_BARRIER.await();
System.out.println("執行緒"+getName()+" 開始乘車回家");
CYCLIC_BARRIER.await();
System.out.println("執行緒"+getName()+" 回家了");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
System.out.print(e.getMessage());
e.printStackTrace();
}
}
}.start();
}
}
}
3.2、CyclicBarrier的重用
package concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class ReuseCyclicBarrier {
public static void main(String[] args) {
final int THREAD_COUNT=3;
CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, new Runnable() {
@Override
public void run() {
print("所有工作執行緒都已到達barrier狀態");
}
});
print("*******第一組 使用 CyclicBarrier*********");
for(int i=0;i<THREAD_COUNT;i++) {
new Worker(barrier).start();
}
try {
Thread.sleep(9000);//此處為了保證主執行緒被阻塞直到第一組3個執行緒執行完畢所有的工作,所以至少要sleep 3000*3,如果少於則不會等到其他三個執行緒執行完畢之後才執行主執行緒
} catch (InterruptedException e) {
e.printStackTrace();
}
print("*************第二組 重用 CyclicBarrier***************\n");
//barrier.reset(); Resets the barrier to its initial state. If any parties are currently
//waiting at the barrier, they will return with a BrokenBarrierException
barrier = new CyclicBarrier(THREAD_COUNT-1, new Runnable() {
@Override
public void run() {
print("【重用】所有工作執行緒都已到達barrier狀態");
}
});
for(int i=0;i<THREAD_COUNT;i++) {
new Worker(barrier).start();
}
}
static class Worker extends Thread{
private CyclicBarrier cyclicBarrier;
public Worker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
doWork(3000L);
print("操作資料完畢,等待其他工作執行緒"+"cyclicBarrier中的Parties數"+cyclicBarrier.getParties());
cyclicBarrier.await();//只是會阻塞,如果這裡耗時很短的話,並不會阻塞主執行緒
}catch(BrokenBarrierException | InterruptedException e){
e.printStackTrace();
}
print("所有工作執行緒都已到達barrier狀態後,所有執行緒執行操作完畢,繼續處理其他任務...");
}
}
public static void print(String s) {
System.out.println(Thread.currentThread().getName()+" 執行緒 : "+s);
}
public static void doWork(long mills){
try {
print("正在操作資料ing");
Thread.sleep(mills);//以睡眠來模擬資料操作
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
三、Semaphore
1、Semaphore 概述
Semaphore首先從字面上理解是 訊號量,Semaphore可以控制同時訪問某組資源的執行緒個數,Semaphore當前在多執行緒環境下被廣泛使用,作業系統的訊號量是個很重要的概念,在程序控制方面都有應用。Java 併發庫 的Semaphore 可以很輕鬆完成訊號量控制可以通過 acquire() 方法獲取一個許可,如果沒有就阻塞;而 release() 方法釋放一個許可,其實Semaphore和鎖機制有點類似,訊號量一般用於控制對 某組 資源的訪問許可權,而鎖是控制對 某個 資源的訪問許可權。
2、Semaphore 的主要方法
方法名稱 | 說明 |
---|---|
public Semaphore(int permits) | permits:一組資源最大同時可以被permits個執行緒所訪問 |
public Semaphore(int permits, boolean fair) | fair:和阻塞佇列一樣用於標識是否採用公平的訪問優先策略,true:按照先來後到的順序獲得機會;false:隨機獲得優先機會 |
public void acquire() | 獲取一個許可,若無許可能夠獲得,則會一直阻塞,直到獲得許可。 |
public void acquire(int permits) | 獲取permits個許可 |
public void release() | 釋放一個許可,在釋放許可之前,必須先獲獲得許可 |
public void release(int permits) | 釋放permits個許可 |
以上四種方法都會阻塞,下面tryXXX的不會阻塞—————— | |
public boolean tryAcquire() | 嘗試獲取一個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false |
public boolean tryAcquire(long timeout, TimeUnit unit) | 嘗試獲取一個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false |
public boolean tryAcquire(int permits) | 嘗試獲取permits個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false |
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) | 嘗試獲取permits個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false |
3、Semaphore 的簡單應用
舉個例子每一層樓指紋打卡機只有3臺,而打卡的員工遠不止,那麼為了更高效率,只得確保同時只能有3個人能夠使用,而當這3個人中的任何一個人打卡完畢離開後,剩下正在等待的其他人中馬上又有1個人可以使用了(這個人可以是等待隊伍中的任意一個,可以是隨機獲得優先機會,也可以是按照先來後到的順序獲得機會)這取決於構造Semaphore物件時傳入的引數選項。另外單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖”,再由另一個執行緒釋放“鎖”,這可應用於死鎖恢復的一些場合。
package concurrent;
import java.util.Random;
import java.util.concurrent.Semaphore;
/**
* 假如一個工廠有3臺打卡機,但是有6名員工,一臺機器同時只能被一個員工使用,只有使用完了,其他員工才能繼續使用
* @author Crazy.Mo
*
*/
public class SemaphoreTest {
public static void main(String[] args) {
final int WORKER_NUM = 6; //總員工數
final int SOURCE_COUNT=3;//3臺指紋打卡機,一組資源
Semaphore semaphore = new Semaphore(SOURCE_COUNT); //最大同時允許SOURCE_COUNT 個員工使用
for(int i=0;i<WORKER_NUM;i++){
new FingerPrint(i,semaphore).start();
}
}
static class FingerPrint extends Thread{
private int num;
private Semaphore semaphore;
public FingerPrint(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
print("員工"+this.num+"正在使用一臺指紋打卡機ing。。。");
Thread.sleep(new Random().nextInt(3000));
print("員工"+this.num+"釋放出一臺指紋打卡機");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void print(String s) {
System.out.println(Thread.currentThread().getName()+" 執行緒 : "+s);
}
public static void doWork(long mills){
try {
Thread.sleep(mills);//以睡眠來模擬打卡
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}