實戰java高併發程式設計之CountDownLatch原始碼分析
阿新 • • 發佈:2018-12-14
首先看第一個!
CountDownLatch
使用場景
CountDownLatch類是常見的併發同步控制類,適用於某一執行緒的執行在其他多個執行緒執行完成之後
,比如火箭發射前需要各項指標檢查,只有當各項指標檢查完才能發射,再比如解析多個excel文件,只有當各個excel解析完成後,才能進行彙總。
程式碼例項
main執行緒代表點火,thread1/thread2/thread3分別表示三個檢查指標,三個執行緒執行完成之後,點火執行緒通過await()方法判斷三個執行緒是否執行成功,成功則繼續執行點火執行緒,否則繼續等待。
public class CountDownLatchTest {
static final CountDownLatch cd = new CountDownLatch(3); // 需要等待3個執行緒
static class CheckOne implements Runnable {
@Override
public void run() {
try {
Thread.sleep(2000); // sleep模擬檢查
} catch (InterruptedException e) {
e.printStackTrace();
}
cd.countDown(); // 該執行緒執行完,cd計數器減1
System. out.println(Thread.currentThread().getName() + " :End!");
}
}
static class CheckTwo implements Runnable{
@Override
public void run() {
try {
Thread.sleep(2000); // sleep模擬檢查
} catch (InterruptedException e) {
e.printStackTrace();
}
cd.countDown(); // 該執行緒執行完,cd計數器減1
System. out.println(Thread.currentThread().getName() + " :End!");
}
}
static class CheckThree implements Runnable{
@Override
public void run() {
try {
Thread.sleep(2000); // sleep模擬檢查
} catch (InterruptedException e) {
e.printStackTrace();
}
cd.countDown(); // 該執行緒執行完,cd計數器減1
System.out.println(Thread.currentThread().getName() + " :End!");
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new CheckOne(), "thread1");
thread1.start();
Thread thread2 = new Thread(new CheckTwo(), "thread2");
thread2.start();
Thread thread3 = new Thread(new CheckThree(), "thread3");
thread3.start();
cd.await(); // 判斷cd計數器是否為0,否則繼續等待
System.out.println(Thread.currentThread().getName() + ": ok!");
}
}
執行結果:
thread1 :End!
thread2 :End!
thread3 :End!
main: ok!
原始碼分析
構造方法:
public CountDownLatch(int count); // count表示計數器
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 呼叫內部類Sync的構造方法
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
CountDownLatch內部類Sync繼承自AbstratcQueuedSynchronizer;所以其同步性利用AQS(AbstratcQueuedSynchronizer)框架實現。AQS內部維持一個volatile修飾的int型別的state;此處構造的引數count即為state賦值;
比較重要的方法有:
public void countdown(); // 呼叫一次 計數器減1
public void countDown() {
sync.releaseShared(1); // 實則呼叫sync的tryReleaseShared方法
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1; // 對status值減1
if (compareAndSetState(c, nextc)) // 通過原子操作把改變後的status值寫入記憶體中
return nextc == 0;
}
}
某執行緒執行完呼叫countDown(),表示所等待的執行緒數減1;
public void await() throws InterruptedException; // 等待的執行緒呼叫await,判斷計數器是否為0
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 如果呼叫await的執行緒被阻塞,則丟擲異常
if (tryAcquireShared(arg) < 0) // status值為0,則等待結束,否則繼續執行
doAcquireSharedInterruptibly(arg); // 繼續阻塞執行緒
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; // 根據status值是否為0,返回1或者-1
}
很明顯,呼叫await()方法的執行緒通過判斷state的值是否為零選擇執行還是阻塞;
public boolean await(long timeout,TimeUnit unit) throws InterruptedException; //
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}
此方法與await()類似,只不過等待有限;若到達等待時間status值不為0則直接執行不等待;
注意事項
CountDownLatch需要明確等待的條件,即確定引數值。