多執行緒併發之CountDownLatch(閉鎖)使用詳解
阿新 • • 發佈:2019-02-17
【1】CountDownLatch是什麼
CountDownLatch,英文翻譯為倒計時鎖存器。是一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。
閉鎖可以延遲執行緒的進度直到其到達終止狀態,閉鎖可以用來確保某些活動直到其他活動都完成才繼續執行:
- 確保某個計算在其需要的所有資源都被初始化之後才繼續執行;
- 確保某個服務在其依賴的所有其他服務都已經啟動之後才啟動;
- 等待直到某個操作所有參與者都準備就緒再繼續執行。
CountDownLatch有一個正數計數器,countDown()方法對計數器做減操作,await()方法等待計數器達到0。所有await的執行緒都會阻塞直到計數器為0或者等待執行緒中斷或者超時。
如下示例,在多執行緒執行情況下,計算多執行緒耗費時間:
public class TestCountDownLatch { public static void main(String[] args){ LatchDemo latchDemo = new LatchDemo(); long begin = System.currentTimeMillis(); //多執行緒 for (int i = 0; i <5 ; i++) { new Thread(latchDemo).start(); } //主執行緒 long end = System.currentTimeMillis(); System.out.println("耗費時間:"+(end-begin)); } } class LatchDemo implements Runnable{ @Override public void run() { for (int i = 0; i < 50000; i++) { if (i%2==0){ System.out.println(i); } } } }
如上示例,很顯然讓不能計算出多執行緒執行的時間!!這時,就可以使用 閉鎖解決這個問題。
【2】CountDownLatch原始碼分析與使用
CountDownLatch原始碼如下:
package java.util.concurrent; import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * A synchronization aid that allows one or more threads to wait until * a set of operations being performed in other threads completes. *//一種同步輔助工具,允許一個或多個執行緒等待,直到在其它執行緒中執行的一組操作完成。 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>. * // 一個CountDownLatch物件通常使用一個計數器(count)進行初始化。 * * The {@link #await await} methods block until the current count reaches * zero due to invocations of the {@link #countDown} method, after which * all waiting threads are released and any subsequent invocations of * {@link #await await} return immediately. * //await方法將會阻塞直到由於countDown方法執行將count (計數器)歸零, * //隨後將釋放所有等待的執行緒,await後面的方法將會立即返回。 * * This is a one-shot phenomenon-- the count cannot be reset. * /*這是一次性的現象,不能重置計數。*/ * If you need a version that resets the count, consider using a {@link CyclicBarrier}. */*如果你需要重置計數器,考慮使用CyclicBarrier--柵欄*/ * <p>A {@code CountDownLatch} is a versatile synchronization tool * and can be used for a number of purposes. * /*這個多功能的同步工具有許多作用*/ * * A {@code CountDownLatch} initialized with a count of one serves as a * simple on/off latch, or gate: all threads invoking {@link #await await} * wait at the gate until it is opened by a thread invoking {@link * #countDown}. * //以計數1初始化的{@code CountDownLatch}用作簡單的開/關鎖存器或閘門: * //呼叫{@linka wait a wait}的所有執行緒都在閘門等待, * //直到它被呼叫{@linkcountDown}的執行緒開啟為止。 * * A {@code CountDownLatch} initialized to <em>N</em> * can be used to make one thread wait until <em>N</em> threads have * completed some action, or some action has been completed N times. * /*初始化為N的{@code CountDownLatch}可用於使一個執行緒等待,*/ /* 直到<em>N</em>執行緒已經完成某個操作,或者某個操作已經完成N次。*/ * <p>A useful property of a {@code CountDownLatch} is that it * doesn't require that threads calling {@code countDown} wait for * the count to reach zero before proceeding, it simply prevents any * thread from proceeding past an {@link #await await} until all * threads could pass. */*一個有用特性是,它不要求呼叫{@code countDown}的執行緒在繼續之前等待計數達到零,*/ /*它只是防止任何執行緒通過{@linka wait a wait}直到所有執行緒都可以通過。*/ //下面是使用開關鎖的一個簡單例子 * <p><b>Sample usage:</b> Here is a pair of classes in which a group * of worker threads use two countdown latches: * <ul> * <li>The first is a start signal that prevents any worker from proceeding * until the driver is ready for them to proceed; * <li>The second is a completion signal that allows the driver to wait * until all workers have completed. * </ul> * * <pre> {@code * class Driver { // ... * void main() throws InterruptedException { * CountDownLatch startSignal = new CountDownLatch(1); * CountDownLatch doneSignal = new CountDownLatch(N); * * for (int i = 0; i < N; ++i) // create and start threads * new Thread(new Worker(startSignal, doneSignal)).start(); * * doSomethingElse(); // don't let run yet * startSignal.countDown(); // let all threads proceed * doSomethingElse(); * doneSignal.await(); // wait for all to finish * } * } * * class Worker implements Runnable { * private final CountDownLatch startSignal; * private final CountDownLatch doneSignal; * Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { * this.startSignal = startSignal; * this.doneSignal = doneSignal; * } * public void run() { * try { * startSignal.await(); * doWork(); * doneSignal.countDown(); * } catch (InterruptedException ex) {} // return; * } * * void doWork() { ... } * }}</pre> * //另外一個典型應用 * <p>Another typical usage would be to divide a problem into N parts, * describe each part with a Runnable that executes that portion and * counts down on the latch, and queue all the Runnables to an * Executor. When all sub-parts are complete, the coordinating thread * will be able to pass through await. (When threads must repeatedly * count down in this way, instead use a {@link CyclicBarrier}.) * * <pre> {@code * class Driver2 { // ... * void main() throws InterruptedException { * CountDownLatch doneSignal = new CountDownLatch(N); * Executor e = ... * * for (int i = 0; i < N; ++i) // create and start threads * e.execute(new WorkerRunnable(doneSignal, i)); * * doneSignal.await(); // wait for all to finish * } * } * * class WorkerRunnable implements Runnable { * private final CountDownLatch doneSignal; * private final int i; * WorkerRunnable(CountDownLatch doneSignal, int i) { * this.doneSignal = doneSignal; * this.i = i; * } * public void run() { * try { * doWork(i); * doneSignal.countDown(); * } catch (InterruptedException ex) {} // return; * } * * void doWork() { ... } * }}</pre> * * <p>Memory consistency effects: Until the count reaches * zero, actions in a thread prior to calling * {@code countDown()} * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> * actions following a successful return from a corresponding * {@code await()} in another thread. * * @since 1.5 * @author Doug Lea */ public class CountDownLatch { /** * CountDownLatch的同步控制,使用AQS狀態呈現Count * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } /** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. * * <p>If the current count is zero then this method returns immediately. * * <p>If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of two things happen: * <ul> * <li>The count reaches zero due to invocations of the * {@link #countDown} method; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted * while waiting */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}, * or the specified waiting time elapses. * * <p>If the current count is zero then this method returns immediately * with the value {@code true}. * * <p>If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of three things happen: * <ul> * <li>The count reaches zero due to invocations of the * {@link #countDown} method; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>The specified waiting time elapses. * </ul> * * <p>If the count reaches zero then the method returns with the * value {@code true}. * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * <p>If the specified waiting time elapses then the value {@code false} * is returned. If the time is less than or equal to zero, the method * will not wait at all. * * @param timeout the maximum time to wait * @param unit the time unit of the {@code timeout} argument * @return {@code true} if the count reached zero and {@code false} * if the waiting time elapsed before the count reached zero * @throws InterruptedException if the current thread is interrupted * while waiting */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 將count-1 ,如果count減一後為0 ,則釋放所有等待的執行緒 */ public void countDown() { sync.releaseShared(1); } /** 返回當前的count,該方法通常用在debug或者測試中 */ public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
簡單總結如下:
- 初始化使用計數器count;
- count代表多個執行緒執行或者某個操作執行次數;
- countDown()方法將會將count-1;
- count為0將會釋放所有等待執行緒;
- await方法將會阻塞直到count為0;
- 開關鎖應用;
- 問題分解應用。
修改【1】中程式碼如下:
public class TestCountDownLatch {
public static void main(String[] args){
//CountDownLatch 為唯一的、共享的資源
final CountDownLatch latch = new CountDownLatch(5);
LatchDemo latchDemo = new LatchDemo(latch);
long begin = System.currentTimeMillis();
for (int i = 0; i <5 ; i++) {
new Thread(latchDemo).start();
}
try {
//多執行緒執行結束前一直等待
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("耗費時間:"+(end-begin));
}
}
class LatchDemo implements Runnable{
private CountDownLatch latch;
public LatchDemo(CountDownLatch latch){
this.latch=latch;
}
public LatchDemo(){
super();
}
@Override
public void run() {
//當前物件唯一,使用當前物件加鎖,避免多執行緒問題
synchronized (this){
try {
for (int i = 0; i < 50000; i++) {
if (i%2==0){
System.out.println(i);
}
}
}finally {
//保證肯定執行
latch.countDown();
}
}
}
}
測試結果如下圖: