1. 程式人生 > >實戰java高併發程式設計之CountDownLatch原始碼分析

實戰java高併發程式設計之CountDownLatch原始碼分析

首先看第一個!

CountDownLatch

使用場景

CountDownLatch類是常見的併發同步控制類,適用於某一執行緒的執行在其他多個執行緒執行完成之後,比如火箭發射前需要各項指標檢查,只有當各項指標檢查完才能發射,再比如解析多個excel文件,只有當各個excel解析完成後,才能進行彙總。

程式碼例項

main執行緒代表點火,thread1/thread2/thread3分別表示三個檢查指標,三個執行緒執行完成之後,點火執行緒通過await()方法判斷三個執行緒是否執行成功,成功則繼續執行點火執行緒,否則繼續等待。

public class CountDownLatchTest {
	
	static
final CountDownLatch cd = new CountDownLatch(3); // 需要等待3個執行緒 static class CheckOne implements Runnable { @Override public void run() { try { Thread.sleep(2000); // sleep模擬檢查 } catch (InterruptedException e) { e.printStackTrace(); } cd.countDown(); // 該執行緒執行完,cd計數器減1 System.
out.println(Thread.currentThread().getName() + " :End!"); } } static class CheckTwo implements Runnable{ @Override public void run() { try { Thread.sleep(2000); // sleep模擬檢查 } catch (InterruptedException e) { e.printStackTrace(); } cd.countDown(); // 該執行緒執行完,cd計數器減1 System.
out.println(Thread.currentThread().getName() + " :End!"); } } static class CheckThree implements Runnable{ @Override public void run() { try { Thread.sleep(2000); // sleep模擬檢查 } catch (InterruptedException e) { e.printStackTrace(); } cd.countDown(); // 該執行緒執行完,cd計數器減1 System.out.println(Thread.currentThread().getName() + " :End!"); } } public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(new CheckOne(), "thread1"); thread1.start(); Thread thread2 = new Thread(new CheckTwo(), "thread2"); thread2.start(); Thread thread3 = new Thread(new CheckThree(), "thread3"); thread3.start(); cd.await(); // 判斷cd計數器是否為0,否則繼續等待 System.out.println(Thread.currentThread().getName() + ": ok!"); } }

執行結果:

thread1 :End!
thread2 :End!
thread3 :End!
main: ok!
原始碼分析

構造方法:

public CountDownLatch(int count);  // count表示計數器
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 呼叫內部類Sync的構造方法
        this.sync = new Sync(count);
    }
    
Sync(int count) {
      setState(count);
}

CountDownLatch內部類Sync繼承自AbstratcQueuedSynchronizer;所以其同步性利用AQS(AbstratcQueuedSynchronizer)框架實現。AQS內部維持一個volatile修飾的int型別的state;此處構造的引數count即為state賦值;

比較重要的方法有:

public void countdown();  // 呼叫一次 計數器減1
public void countDown() {
    sync.releaseShared(1);  // 實則呼叫sync的tryReleaseShared方法
}
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;   // 對status值減1
                if (compareAndSetState(c, nextc))  // 通過原子操作把改變後的status值寫入記憶體中
                    return nextc == 0;
            }
        }

某執行緒執行完呼叫countDown(),表示所等待的執行緒數減1;

public void await() throws InterruptedException; // 等待的執行緒呼叫await,判斷計數器是否為0
public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();   // 如果呼叫await的執行緒被阻塞,則丟擲異常
        if (tryAcquireShared(arg) < 0)    // status值為0,則等待結束,否則繼續執行
            doAcquireSharedInterruptibly(arg);   // 繼續阻塞執行緒
}
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;   // 根據status值是否為0,返回1或者-1
}

很明顯,呼叫await()方法的執行緒通過判斷state的值是否為零選擇執行還是阻塞;

public boolean await(long timeout,TimeUnit unit) throws InterruptedException; //
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
    }

此方法與await()類似,只不過等待有限;若到達等待時間status值不為0則直接執行不等待;

注意事項

CountDownLatch需要明確等待的條件,即確定引數值。

參考資料