關於對CountDownLatch、CyclicBarrier、Semaphore執行緒同步理解
概念描述以及程式碼理解
- CountDownLatch(閉鎖)
出現再JDK1.5中,主要是使一個執行緒A或是組執行緒A等待其它執行緒執行完畢後,一個執行緒A或是組執行緒A才繼續執行,可以實現執行緒組同步執行,並在所有執行緒組結束後再執行等待的執行緒,閉鎖的狀態是一次性。 例如:主執行緒等待執行緒組執行完畢後再執行,是執行緒組之間的等待。
public class CountDownLatchTest { // 模擬了100米賽跑,10名選手已經準備就緒,只等裁判一聲令下。當所有人都到達終點時,比賽結束。 public static void main(String[] args) throws InterruptedException { // 開始的倒數鎖 final CountDownLatch begin = new CountDownLatch(1); // 結束的倒數鎖 final CountDownLatch end = new CountDownLatch(10); // 十名選手 final ExecutorService exec = Executors.newFixedThreadPool(10); for (int index = 0; index < 10; index++) { final int NO = index + 1; Runnable run = new Runnable() { public void run() { try { // 如果當前計數為零,則此方法立即返回。 // 等待 begin.await(); Thread.sleep((long) (Math.random() * 10000)); System.out.println("No." + NO + " arrived"); } catch (InterruptedException e) { } finally { // 每個選手到達終點時,end就減一 end.countDown(); } } }; exec.submit(run); } System.out.println("Game Start"); // begin減一,開始遊戲 begin.countDown(); // 等待end變為0,即所有選手到達終點 end.await(); System.out.println("Game Over"); exec.shutdown(); } }
wait還有一個超時的方法 public boolean await(long timeout, TimeUnit unit) throws InterruptedException 使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷或超出了指定的等待時間。如果當前計數為零,則此方法立刻返回 1. 在進入此方法時已經設定了該執行緒的中斷狀態;或者 在等待時被中斷,則丟擲 InterruptedException,並且清除當前執行緒的已中斷狀態。 2. 如果超出了指定的等待時間,則返回值為 false
Game Start No.9 arrived No.6 arrived No.8 arrived No.7 arrived No.10 arrived No.1 arrived No.5 arrived No.4 arrived No.2 arrived No.3 arrived Game Over
執行過程理解:一共有11個執行緒,主執行緒建立並啟動10個子執行緒,然後子執行緒組再執行的時候被 begin.await()都阻塞了,主執行緒繼續執行到 begin.countDown()時,begin 計數器變成0, end.await()阻塞主執行緒,10個子執行緒繼續執行,因為begin 計數器成0,就不會被阻塞了,等10個子執行緒執行完畢(end計數器減一),end為0時,end.await()執行後面的程式碼,執行流程結束。
2.CyclicBarrier(迴圈屏障)
主要是一組執行緒使用await()方法之後,執行緒就處於barrier狀態了,當所有執行緒都到達各自的barrier後,再同時執行各自barrier下面的程式碼,是執行緒之間的互相等待。例如:團隊旅遊,一個團隊通常分為幾組,每組人走的路線可能不同,但都需要到達某一地點等待團隊其它成員到達後才能進行下一站,是執行緒組內間的等待。
- CyclicBarrier提供的方法有:
——CyclicBarrier(parties)
初始化相互等待的執行緒數量的構造方法。
——CyclicBarrier(parties,Runnable barrierAction)
初始化相互等待的執行緒數量以及屏障執行緒的構造方法。
屏障執行緒的執行時機:等待的執行緒數量=parties之後,CyclicBarrier開啟屏障之前。
舉例:在分組計算中,每個執行緒負責一部分計算,最終這些執行緒計算結束之後,交由屏障執行緒進行彙總計算。
——getParties()
獲取CyclicBarrier開啟屏障的執行緒數量,也成為方數。
——getNumberWaiting()
獲取正在CyclicBarrier上等待的執行緒數量。
——await()
在CyclicBarrier上進行阻塞等待,直到發生以下情形之一:
在CyclicBarrier上等待的執行緒數量達到parties,則所有執行緒被釋放,繼續執行。
當前執行緒被中斷,則丟擲InterruptedException異常,並停止等待,繼續執行。
其他等待的執行緒被中斷,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
其他等待的執行緒超時,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
其他執行緒呼叫CyclicBarrier.reset()方法,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
//建構函式1:初始化-開啟屏障的方數
CyclicBarrier barrier0 = new CyclicBarrier(2);
//通過barrier.getParties()獲取開啟屏障的方數
LOGGER.info("barrier.getParties()獲取開啟屏障的方數:" + barrier0.getParties());
System.out.println();
//通過barrier.getNumberWaiting()獲取正在等待的執行緒數
LOGGER.info("通過barrier.getNumberWaiting()獲取正在等待的執行緒數:初始----" + barrier0.getNumberWaiting());
System.out.println();
new Thread(() -> {
//新增一個等待執行緒
LOGGER.info("新增第1個等待執行緒----" + Thread.currentThread().getName());
try {
barrier0.await();
LOGGER.info(Thread.currentThread().getName() + " is running...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
LOGGER.info(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(10);
//通過barrier.getNumberWaiting()獲取正在等待的執行緒數
LOGGER.info("通過barrier.getNumberWaiting()獲取正在等待的執行緒數:新增第1個等待執行緒---" + barrier0.getNumberWaiting());
Thread.sleep(10);
System.out.println();
new Thread(() -> {
//新增一個等待執行緒
LOGGER.info("新增第2個等待執行緒----" + Thread.currentThread().getName());
try {
barrier0.await();
LOGGER.info(Thread.currentThread().getName() + " is running...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
LOGGER.info(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(100);
System.out.println();
//通過barrier.getNumberWaiting()獲取正在等待的執行緒數
LOGGER.info("通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---" + barrier0.getNumberWaiting());
//已經開啟的屏障,再次有執行緒等待的話,還會重新生效--視為迴圈
new Thread(() -> {
LOGGER.info("屏障開啟之後,再有執行緒加入等待:" + Thread.currentThread().getName());
try {
//BrokenBarrierException
barrier0.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
LOGGER.info(Thread.currentThread().getName() + " is terminated.");
}).start();
System.out.println();
Thread.sleep(10);
LOGGER.info("通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---" + barrier0.getNumberWaiting());
Thread.sleep(10);
new Thread(() -> {
LOGGER.info("屏障開啟之後,再有執行緒加入等待:" + Thread.currentThread().getName());
try {
//BrokenBarrierException
barrier0.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
LOGGER.info(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(10);
LOGGER.info("通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---" + barrier0.getNumber
2018-04-01 13:27:55 INFO - barrier.getParties()獲取開啟屏障的方數:2
2018-04-01 13:27:55 INFO - 通過barrier.getNumberWaiting()獲取正在等待的執行緒數:初始----0
2018-04-01 13:27:55 INFO - 新增第1個等待執行緒----Thread-0
2018-04-01 13:27:55 INFO - 通過barrier.getNumberWaiting()獲取正在等待的執行緒數:新增第1個等待執行緒---1
2018-04-01 13:27:55 INFO - 新增第2個等待執行緒----Thread-1
2018-04-01 13:27:55 INFO - Thread-1 is running...
2018-04-01 13:27:55 INFO - Thread-0 is running...
2018-04-01 13:27:55 INFO - Thread-1 is terminated.
2018-04-01 13:27:55 INFO - Thread-0 is terminated.
2018-04-01 13:27:55 INFO - 通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---0
2018-04-01 13:27:55 INFO - 屏障開啟之後,再有執行緒加入等待:Thread-2
2018-04-01 13:27:55 INFO - 通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---1
2018-04-01 13:27:55 INFO - 屏障開啟之後,再有執行緒加入等待:Thread-3
2018-04-01 13:27:55 INFO - Thread-3 is terminated.
2018-04-01 13:27:55 INFO - Thread-2 is terminated.
2018-04-01 13:27:55 INFO - 通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---0
- 熟悉reset()的用法
如果是一個初始的CyclicBarrier,則reset()之後,什麼也不會發生,如果在等待過程中,執行reset()方法,等待的執行緒跑出BrokenBarrierException異常,並不再等待。
在這裡插入程式碼片
- Semaphore (訊號量)
是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源,這個理解應該不難。
package javalearning;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
private Semaphore smp = new Semaphore(3);
private Random rnd = new Random();
class TaskDemo implements Runnable{
private String id;
TaskDemo(String id){
this.id = id;
}
@Override
public void run(){
try {
smp.acquire();
System.out.println("Thread " + id + " is working");
Thread.sleep(rnd.nextInt(1000));
smp.release();
System.out.println("Thread " + id + " is over");
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args){
SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
//注意我建立的執行緒池型別,
ExecutorService se = Executors.newCachedThreadPool();
se.submit(semaphoreDemo.new TaskDemo("a"));
se.submit(semaphoreDemo.new TaskDemo("b"));
se.submit(semaphoreDemo.new TaskDemo("c"));
se.submit(semaphoreDemo.new TaskDemo("d"));
se.submit(semaphoreDemo.new TaskDemo("e"));
se.submit(semaphoreDemo.new TaskDemo("f"));
se.shutdown();
}
}
執行結果
Thread c is working
Thread b is working
Thread a is working
Thread c is over
Thread d is working
Thread b is over
Thread e is working
Thread a is over
Thread f is working
Thread d is over
Thread e is over
Thread f is over
可以看出,最多同時有三個執行緒併發執行,也可以認為有三個公共資源