併發工具類 countDownLatch、CyclicBarrier與Semaphore
阿新 • • 發佈:2018-11-05
1、等待多執行緒完成的 CountDownLatch
CountDownLatch允許一個或多個執行緒等待其他執行緒完成操作。
它的建構函式接受一個int型別的引數作為計數器,如果想等待N個點完成,這裡傳入N即可。
呼叫countDown方法時,N就會減1,await方法會阻塞當前執行緒直到N為0時。N個點可以是N個執行緒,也可以是同一執行緒中的N個點。
比如如下程式碼實現等待N個執行緒執行併發並等待N個執行緒結束。
package cn.zhjw.eurekaclientnode2.common; import java.io.IOException; import java.util.concurrent.CountDownLatch; /** * @Desc: * @Author: zhaojiwei * @Date: 2018/10/29 22:56 */ public class ThreadSyn { CountDownLatch latchOne = new CountDownLatch(1); CountDownLatch latchTen = new CountDownLatch(10); public static void main(String[] args) throws IOException { ThreadSyn threadSyn = new ThreadSyn(); threadSyn.testCountDownLatch(); System.in.read(); System.out.println("##############"); } public void testCountDownLatch() { int m = 10; for (int i = 0; i < m; i++) { Thread thread = new Thread(new MyRunnable(latchOne, latchTen)); thread.start(); } System.out.println("開始併發執行。。。"); latchOne.countDown(); try { latchTen.await(); System.out.println("############### All Thread Done!!#################"); } catch (InterruptedException e) { e.printStackTrace(); } } private class MyRunnable implements Runnable { private CountDownLatch countDownLathchOne; private CountDownLatch countDownLatchTen; public MyRunnable(CountDownLatch latchOne, CountDownLatch latchTen) { this.countDownLatchTen = latchTen; this.countDownLathchOne = latchOne; } /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { System.out.println("到達等待點 。。。" + Thread.currentThread().getName()); countDownLathchOne.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("running 。。。" + Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } //執行後減去訊號量 countDownLatchTen.countDown(); } } }
2、同步屏障 CyclicBarrier
顧名思義:可迴圈使用的屏障。它所要做的事情是讓一組執行緒到達一個屏障(同步點)時阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被攔截的執行緒才會繼續執行。
CyclicBarrier 與 CountDownLatch的區別:
- CountDownLatch 計數器只能使用一次
- CyclicBarrier 的計數器可通過方法reset() 重置,適合處理更復雜的業務場景;比如計算錯誤可以重置計數器重新計算一次;
- CyclicBarrier 還有其他可用方法,比如getNumberWating() 可以忽的阻塞的執行緒數,isBorken()用來了解阻塞的執行緒是否被中斷。
比如多執行緒彙總資料後,計算總資料。
package cn.zhjw.eurekaclientnode2.common; import java.util.Map; import java.util.Set; import java.util.concurrent.*; /** * @Desc:迴圈屏障 * @Author: zhaojiwei * @Date: 2018/11/1 15:52 */ public class ThreadSynCyclicBarrier { static final int COUNT = 10; /** * 假設需要執行4個任務,開啟4個執行緒去執行 */ private Executor executor = Executors.newFixedThreadPool(COUNT); private ConcurrentHashMap<String, Integer> bankWaterCount = new ConcurrentHashMap<>(); /** * 迴圈屏障,都到達同步點時,優先執行回撥操作barrierAction */ private CyclicBarrier cyclicBarrier = new CyclicBarrier(COUNT, new BankWaterService(bankWaterCount)); public static void main(String[] args) { ThreadSynCyclicBarrier cb = new ThreadSynCyclicBarrier(); cb.count(); System.out.println("$#################"); } /** * 模擬多執行緒計算 */ public void count() { for (int i = 0; i < COUNT; i++) { executor.execute(new Runnable() { @Override public void run() { //計算當前sheet中銀流資料,計算程式碼省略 bankWaterCount.put(Thread.currentThread().getName(), Integer.valueOf(1)); //銀流計算完後,插入一個屏障 try { System.out.println(Thread.currentThread().getName()+" => await..."); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); } } class BankWaterService implements Runnable { private ConcurrentHashMap<String, Integer> bankWaterCount; public BankWaterService(ConcurrentHashMap<String, Integer> bankWaterCount) { this.bankWaterCount = bankWaterCount; } /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { int result = 0; Set<Map.Entry<String, Integer>> entries = bankWaterCount.entrySet(); for (Map.Entry<String, Integer> entry : entries) { result += entry.getValue(); } //將結果輸出 bankWaterCount.put("result", result); System.out.println(result); } } }
3、控制併發量的訊號量 Semaphore
Semaphore 訊號量用於控制多執行緒最大併發度(併發數量)。它協調多執行緒合理利用公共資源。
Semaphore可用於流量的控制,特別是公共資源有限的應用場景,比如資料庫連線。
package cn.zhjw.eurekaclientnode2.common;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @Desc: 訊號量併發流量的限制
* @Author: zhaojiwei
* @Date: 2018/11/1 16:56
*/
public class ThreadSynSemaphore {
private final static int THREAD_COUNT = 30;
private final static int SEMAPHORE_COUNT = 10;
/**
* 30個執行緒
*/
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
/**
* 10個訊號量 控制併發度
*
* @param args
*/
private static Semaphore semaphore = new Semaphore(SEMAPHORE_COUNT);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("save data!!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
});
}
threadPool.shutdown();
}
}
Semaphore 建構函式接受一個int型別的引數,表示可用的許可證,通過調研 acquire()獲取一個許可證,用完後通過反覆release() 歸還許可證即可。還可以通過tryAcquire()方法嘗試獲取許可證。
4、執行緒間交換資料的 交換者 Exchanger
Exchanger 是用於執行緒間交換資料的工具類。它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的 資料。
兩個執行緒同步Exchanger交換資料,第一個執行緒如果先執行exchange()方法,它會一直等待第二個執行緒也執行exchange()方法,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料,將本執行緒生產的資料傳遞給對方。
應用場景:
- 遺傳演算法
- 校對工作
package cn.zhjw.eurekaclientnode2.common;
import java.util.concurrent.*;
/**
* @Desc: 執行緒間交換資料
* @Author: zhaojiwei
* @Date: 2018/11/1 17:15
*/
public class ThreadSynExchanger {
/**
* 交換字串資料
*/
private static final Exchanger<String> exchanger = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
/**
* A 錄入銀行流水資料
*/
String A = "銀行流水A";
try {
String exchange = exchanger.exchange(A, 1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
/**
* B 員工錄入的銀行流水資料
*/
String B = "銀行流水B";
try {
String A = exchanger.exchange(B, 1, TimeUnit.MINUTES);
System.out.println("A和B的錄入資料是否一致:" + A.equals(B) + ",A錄入的是:" + A + ",B 錄入的是:" + B);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
});
}
}
參考:《JAVA 併發程式設計的藝術》