J.U.C工具類中的CountDownLatch和CyclicBarrier
API文檔是這樣介紹的:一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點(common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此使CyclicBarrier很有用。因為該barrier在釋放等待線程後可以重用,所以稱它為循環的barrier。
CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最後一個線程到達之後(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作 很有用。
類圖:
上圖可知:CyclicBarrier的內部是使用重入鎖ReetrantLock和Condition。
構造函數
源碼
/** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties) { this(parties, null); }
Parties:表示攔截線程的數量。
barrierAction:為CyclicBarrier接受的Runnable命令,用於線程到達屏障時,優先執行barrierAction,用於處理復雜的業務場景.
<br/>
現在我們來看看CyclicBarrier最重要的函數await():
代碼:
await()方法: public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } dowait()方法: private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //獲取鎖 final ReentrantLock lock = this.lock; lock.lock(); try { //分代 final Generation g = generation; //當前generation“已損壞”,拋出BrokenBarrierException異常 //拋出該異常一般都是某個線程在等待某個處於“斷開”狀態的CyclicBarrie if (g.broken) //當某個線程試圖等待處於斷開狀態的 barrier 時,或者 barrier 進入斷開狀態而線程處於等待狀態時,拋出該異常 throw new BrokenBarrierException(); //如果線程中斷,終止CyclicBarrier if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //進來一個線程 count - 1 int index = --count; //count == 0 表示所有線程均已到位,觸發Runnable任務 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; //觸發任務 if (command != null) command.run(); ranAction = true; //喚醒所有等待線程,並更新generation nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { //如果不是超時等待,則調用Condition.await()方法等待 if (!timed) trip.await(); else if (nanos > 0L) //超時等待,調用Condition.awaitNanos()方法等待 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We‘re about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); //generation已經更新,返回index if (g != generation) return index; //“超時等待”,並且時間已到,終止CyclicBarrier,並拋出異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //釋放鎖 lock.unlock(); } }
我們再看看jdkAPI文檔描述:
public int await()
throws InterruptedException,
BrokenBarrierException
在所有 參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。
如果當前線程不是將到達的最後一個線程,出於調度目的,將禁用它,且在發生以下情況之一前,該線程將一直處於休眠狀態:
最後一個線程到達;或者
其他某個線程中斷當前線程;或者
其他某個線程中斷另一個等待線程;或者
其他某個線程在等待 barrier 時超時;或者
其他某個線程在此 barrier 上調用 reset()。
如果當前線程:
在進入此方法時已經設置了該線程的中斷狀態;或者
在等待時被中斷
則拋出 InterruptedException,並且清除當前線程的已中斷狀態。
如果在線程處於等待狀態時 barrier 被 reset(),或者在調用 await 時 barrier 被損壞,抑或任意一個線程正處於等待狀態,則拋出 BrokenBarrierException 異常。
如果任何線程在等待時被 中斷,則其他所有等待線程都將拋出 BrokenBarrierException 異常,並將 barrier 置於損壞狀態。
如果當前線程是最後一個將要到達的線程,並且構造方法中提供了一個非空的屏障操作,則在允許其他線程繼續運行之前,當前線程將運行該操作。如果在執行屏障操作過程中發生異常,則該異常將傳播到當前線程中,並將 barrier 置於損壞狀態。
返回:
到達的當前線程的索引,其中,索引 getParties() - 1 指示將到達的第一個線程,零指示最後一個到達的線程
拋出:
InterruptedException - 如果當前線程在等待時被中斷
BrokenBarrierException - 如果 另一個 線程在當前線程等待時被中斷或超時,或者重置了 barrier,或者在調用 await 時 barrier 被損壞,抑或由於異常而導致屏障操作(如果存在)失敗。
示例:
public class CyclicBarrierTest {
public static CyclicBarrier cyclicBarrier;
public static class WorkThread extends Thread
{
public void run()
{
try {
System.out.println(Thread.currentThread().getName()+"已進入");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+"離開");
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
cyclicBarrier=new CyclicBarrier(4,new Runnable() {
public void run()
{
System.out.println("人已經到齊");
}
});
for(int i=0;i<4;i++)
{
new WorkThread().start();
}
}
}
運行結果:
Thread-0已進入
Thread-2已進入
Thread-1已進入
Thread-3已進入
人已經到齊
Thread-3離開
Thread-0離開
Thread-1離開
Thread-2離開
講解CountDownLatch
看一下JDK文檔的描述:
-
一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。
-
用給定的計數 初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之後,會釋放所有等待的線程,await 的所有後續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
-
CountDownLatch 是一個通用同步工具,它有很多用途。將計數 1 初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,或入口:在通過調用 countDown() 的線程打開入口前,所有調用 await 的線程都一直在入口處等待。用 N 初始化的 CountDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。
- CountDownLatch 的一個有用特性是,它不要求調用 countDown 方法的線程等到計數到達零時才繼續,而在所有線程都能通過之前,它只是阻止任何線程繼續通過一個 await。
構造函數
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
從上述可以看出:count :在線程能通過 await() 之前,必須調用 countDown() 的次數
說到這我們來看看CountDownLatch中的連個方法:countDown()和await();
await()方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
描述:
使當前線程在鎖存器倒計數至零之前一直等待,除非線程被 中斷。
如果當前計數為零,則此方法立即返回。
如果當前計數大於零,則出於線程調度目的,將禁用當前線程,且在發生以下兩種情況之一前,該線程將一直處於休眠狀態:
由於調用 countDown() 方法,計數到達零;或者
其他某個線程中斷當前線程。
如果當前線程:
在進入此方法時已經設置了該線程的中斷狀態;或者
在等待時被中斷,
則拋出 InterruptedException,並且清除當前線程的已中斷狀態。
countDown()方法:
public void countDown() {
sync.releaseShared(1);
}
描述:遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。
如果當前計數大於零,則將計數減少。如果新的計數為零,出於線程調度目的,將重新啟用所有的等待線程。
如果當前計數等於零,則不發生任何操作。
示例:
public class CountDownLatchTest {
private static class WorkThread extends Thread
{
private CountDownLatch cdl;
private int sleepSecond;
public WorkThread(String name, CountDownLatch cdl, int sleepSecond)
{
super(name);
this.cdl = cdl;
this.sleepSecond = sleepSecond;
}
public void run()
{
try
{
System.out.println(this.getName() + "啟動了,時間為" + System.currentTimeMillis());
Thread.sleep(sleepSecond * 1000);
cdl.countDown();
System.out.println(this.getName() + "執行完了,時間為" + System.currentTimeMillis());
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
private static class DoneThread extends Thread
{
private CountDownLatch cdl;
public DoneThread(String name, CountDownLatch cdl)
{
super(name);
this.cdl = cdl;
}
public void run()
{
try
{
System.out.println(this.getName() + "要等待了, 時間為" + System.currentTimeMillis());
cdl.await();
System.out.println(this.getName() + "等待完了, 時間為" + System.currentTimeMillis());
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception
{
CountDownLatch cdl = new CountDownLatch(3);
DoneThread dt0 = new DoneThread("DoneThread1", cdl);
DoneThread dt1 = new DoneThread("DoneThread2", cdl);
dt0.start();
dt1.start();
WorkThread wt0 = new WorkThread("WorkThread1", cdl, 2);
WorkThread wt1 = new WorkThread("WorkThread2", cdl, 3);
WorkThread wt2 = new WorkThread("WorkThread3", cdl, 4);
wt0.start();
wt1.start();
wt2.start();
}
}
運行結果:
DoneThread2要等待了, 時間為1529917959491
DoneThread1要等待了, 時間為1529917959491
WorkThread1啟動了,時間為1529917959492
WorkThread2啟動了,時間為1529917959492
WorkThread3啟動了,時間為1529917959492
WorkThread1執行完了,時間為1529917961493
WorkThread2執行完了,時間為1529917962492
WorkThread3執行完了,時間為1529917963492
DoneThread1等待完了, 時間為1529917963492
DoneThread2等待完了, 時間為1529917963492
兩者之間的比較
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置。
J.U.C工具類中的CountDownLatch和CyclicBarrier