同步計數器CountDownLatch 和CyclicBarrier
阿新 • • 發佈:2018-12-20
CountDownLatch ,把一個工作分給5個人,5個執行緒都執行完了,呼叫countDown,給計數器減數,而主執行緒await,等數為零,主執行緒繼續往下執行,即5條執行緒都完成才算工作完成。
內部很簡單,還是繼承AQS,把設定的數量賦值給state,countDown就減state,await想必掛起執行緒和解放執行緒。
CyclicBarrier 這麼打比方,5個執行緒,每個人的工作分成兩部分,完成第一部分後,需要確保5個人都完成了第一部分,大家才能各自進行第二部分。同時CyclicBarrier 可以設定一個主任務,在5個人的第一部分都執行完之後,這個主任務就會執行
CountDownLatch 是一個執行緒(例子中是主執行緒)等待其他執行緒執行完畢後,才能執行。
CyclicBarrier 是多個執行緒互相等待對方執行到指定的那一步,然後這些執行緒才能繼續執行
一.CountDownLatch
1.例子
public class TestCountDown { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(4); Work w1 = new Work(latch); Work w2 = new Work(latch); Work w3 = new Work(latch); w1.start(); w2.start(); w3.start(); latch.await(); System.out.println("latch要等計數器為0,await方法才能釋放"); } static class Work extends Thread{ private CountDownLatch latch; @Override public void run() { System.out.println(Thread.currentThread().getName()+"正在工作"); latch.countDown(); } public Work(CountDownLatch latch) { super(); this.latch = latch; } } }
主執行緒只有等三個執行緒都執行完,計數器為零後,await方法才釋放,才執行
如果改為CountDownLatch latch = new CountDownLatch(4);
會發現主執行緒一直堵塞,因為計數器永遠無法到0
2.應用場景
開五個執行緒去上傳或下載,只有五個執行緒都成功才算成功
3.原始碼
new CountDownLatch(4);數量其實是Sync的state
public class CountDownLatch { /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
countDown其實就是在對state減一,到0時就不能減了
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
await應該是計算state是否=0,不等於就堵塞
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
二.CyclicBarrier
1.例子
public class TestCyclicBarrier {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3,new TotalTask());
Work w1 = new Work(barrier);
Work w2 = new Work(barrier);
Work w3 = new Work(barrier);
w1.start();w2.start();w3.start();
System.out.println("z主執行緒");
}
static class TotalTask extends Thread{
@Override
public void run() {
System.out.println("所有任務的第一部分都執行完,總任務就會執行");
}
}
static class Work extends Thread{
private CyclicBarrier barrier;
public Work(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"執行工作的第一部分");
try {
barrier.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"執行工作的第二部分");
}
}
}