Join,CountDownLatch,CyclicBarrier,Semaphore和Exchanger
CountDownLatch允許一個或者多個線程等待其他線程完成操作,之後再對結果做統一處理;
適用場景,分布式系統中對多個微服務的調用,並發執行並且必須等待全部執行完成才能繼續執行後續操作;
其實在java中默認的實現是join()方法,join()方法主要的作用是當前線程必須等待直到join線程執行完成之後才能繼續執行後續的操作,
其本質就是輪詢判斷join線程是否存活,如果存活則主線程繼續等待,否則,通過調用this.notifyAll()方法來繼續執行主線程。
實例代碼如下:
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("this is thread 1");
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Thread2 is finish");
}
});
thread1.start();
thread2.start();
/*thread1.join();
thread2.join();*/ (1)
System.out.println("all parser finish");
}
現在的代碼是註釋掉了兩個join()方法的調用,那麽輸出結果將不能被保證,三個sout的輸出打印是亂序的。
如果將上述的註釋(1)去掉,則根據join()方法的定義,可以知main線程會先等待thread1的執行結束才會執行thread2的執行,直到thread2執行結束才會繼續往下執行輸出:
"all parser finish";從而保證執行順序固定,即線程thread1先執行,其次是thread2的執行,最後main線程執行最後的輸出。
那麽同樣的如果我們用CountDownLatch來實現,則應用代碼如下:
static CountDownLatch c = new CountDownLatch(2);//定義成員變量
public static void main(String[] args) throws InterruptedException{
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println(3);
}
其中定義的CountDownLatch(2)表示等待兩個點完成,即當c變成0以後當前線程才會繼續執行後續代碼,否則由於await()方法,線程會一直等待;
而每次調用countDown()方法則c就會減一,上述代碼在輸出1,2之後,因為調用兩次countDown之後c變成0,那麽c.await()方法會失效,然後main()線程執行最後輸出3;
如果我們要等待的是多個線程的並發執行,則代碼如下
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("this is thread 1");
try {
Thread.sleep(100l);
} catch (InterruptedException e) {
e.printStackTrace();
}
c.countDown();
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Thread2 is finish");
c.countDown();
}
});
thread1.start();
thread2.start();
c.await();
System.out.println("all parser finish");
}
上述代碼中我們可以保證的是main線程會等待thread1和thread2線程的執行完成,之後再執行最後的打印,但是不保證thead1和thread2執行的先後順序即有可能thread1先執行,也有可能thread2先執行;在這一點上有別與join方法,join方法可以保證其是按照調用順序來執行的。
註意:在使用CountDownLatch()的過程中必須保證count次數大於0,因為只有count次數大於0才能保證await()方法調用的阻塞。
等待多線程完成的CountDownLatch和join()方法的使用就到這裏結束了。
CyclicBarrier:同步屏障,作用是使得一組線程到達一個同步點時被阻塞,直到所有線程都到達屏障時,屏障才會消失,所有被攔截的線程才可以繼續執行。
CyclicBarrier的使用方式和CountDownLatch類似;實例代碼如下:
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(2);
}
上述代碼的執行結果可能是1,2也可能是2,1,c並沒有保證線程的順序,從目前來看,CyclicBarrier和CountDownLatch幾乎實現的是一樣的功能。
但是CyclicBarrier有更強大的功能,即通過構造函數:new CyclicBarrier(int parties,Runnable barrierAction)來保證線程到達同步點的時候,優先執行barrierAction中的任務。實例代碼如下:
static CyclicBarrier c = new CyclicBarrier(2, new PrThread());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(2);
}
static class PrThread implements Runnable {
@Override
public void run() {
System.out.println(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
其中輸出順序被保證為3,1,2,因為count設置為2,所以必須在第一個線程和線程PrThread執行完成之後才能執行主線程,完成輸出。
CyclicBarrier適用於多線程計算最後合並結果的場景。
還有一點就是CountDownLatch()方法只能用一次,而CyclicBarrier可以通過reset()方法重復調用。至於其他方法比如getNumberWaiting可以獲取CyclicBarrier阻塞的線程數等。
對CyclicBarrier的使用到這就結束了。
Semaphone:信號量是用來控制同時訪問特定資源的線程數量,以保證合理使用有限的公共資源。常用場景是流量控制。實例代碼如下:
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i <THREAD_COUNT ; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("DO SOMETHING FOR YOURSELF"+s.getQueueLength());
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
,其中構造線程池大小為30,而在同一時刻只允許10個線程執行輸出;s.acquire()要求獲取一個許可,而s.release()表示釋放獲取到的許可,當線程數超過可用的許可數,則進入等待狀態,直到有許可可用,才會繼續執行下一個任務。
信號量的使用到這裏就結束了,我們最後再說線程間的交換數據,其實就是線程之間的數據傳遞:
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String thread1 = "要交換的數據1";
exgr.exchange(thread1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String thread2 = "要交換的數據1";
String exchange = exgr.exchange("thread2");
System.out.println(exchange.equals(thread2));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.shutdown();
}
};
其中exchange為從thread1中拿到的要對比的數據,然後和thread2做對比,如果是相當則輸出true.
其中exgr攜帶了需要交互的數據信息。
到此Exchanger的使用結束。
並發編程中用的比較多的就是CountDownLatch和CyclicBarrier和Semaphore。所以了解這些有助於我們以後更好的編程
本文出自 “用行動,證明你” 博客,請務必保留此出處http://wuseeker.blog.51cto.com/5759064/1945512
Join,CountDownLatch,CyclicBarrier,Semaphore和Exchanger