Java併發工具類
目錄
在JDK的併發包(java.util.concurrent)裡提供了幾個非常有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段,Exchanger工具類則提供了線上程間交換資料的一種手段。
1.等待多執行緒完成的CountDownLatch
CountDownLatch允許一個或多個執行緒等待其他執行緒完成操作,類似於join方法。join的實現原理是不停的檢查join執行緒是否存活,如果join執行緒存活則讓當前執行緒永遠等待。CountDownLatch經常用於監聽某些初始化操作,等初始化執行完畢後,通知主執行緒繼續工作。
CountDownLatch的建構函式接收一個int型別的引數作為計數器,如果你想等待N個點完成,這裡就傳入N。當呼叫CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前執行緒,直到N變成零。由於CountDownLatch的countDown方法可以用在任何地方,所以這裡說的N個點,可以是N個執行緒,也可以是1個執行緒裡的N個執行步驟。當然,如果一個執行緒執行很慢的話,我們不可能讓主執行緒一直等待下去,所以可以使用另外一個帶有指定時間的await方法,await(long time,TimeUnit unit)這個方法等待特定時間後,就會不再阻塞當前執行緒。
實現的例項程式碼如下:
public class CountDownLatchTest { static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(1); countDownLatch.countDown(); } }).start(); new Thread(new Runnable() { @Override public void run() { System.out.println(2); countDownLatch.countDown(); } }).start(); countDownLatch.await(); System.out.println("thread finished"); } }
2.同步屏障CyclicBarrier
CylicBarrier讓一組執行緒到達一個屏障(同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。假設每個執行緒代表一個跑步運動員,當運動員都準備好以後,才一起出發,只要有一個人沒有準備好,大家都等待。CylicBarrier預設的構造方法是CylicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CylicBarrier已經到達屏障,然後當前執行緒被阻塞。
例項程式碼如下:
public class CyclicBarrierTest {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(1);
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(2);
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
System.out.println("thread finished");
}
}
如果把建構函式更改為3,則主執行緒和子執行緒永遠都不會得到執行,因為沒有第3個執行緒執行await方法,即沒有第3個執行緒到達屏障,所以之前到達屏障的兩個執行緒永遠不會繼續執行。
CyclicBarrier還提供了一個更加高階的建構函式CyclicBarrier(int parties,Runnable barrierAction),用於線上程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。CylicBarrier可以用於計算資料,最後合併計算結果的場景。例項程式碼:
public class CyclicBarrierTest implements Runnable {
private CyclicBarrier cyclicBarrier = new CyclicBarrier(4, this);
private Executor executor = Executors.newFixedThreadPool(4);
private ConcurrentMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
private void calculate() {
for (int i = 0; i < 4; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
concurrentMap.put(Thread.currentThread().getName(), 1);
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) throws InterruptedException {
CyclicBarrierTest cyclicBarrierTest = new CyclicBarrierTest();
cyclicBarrierTest.calculate();
Thread.sleep(1000);
System.out.println(cyclicBarrierTest.concurrentMap.get("result"));
System.out.println(cyclicBarrierTest.cyclicBarrier.getNumberWaiting());
System.out.println(cyclicBarrierTest.cyclicBarrier.isBroken());
}
@Override
public void run() {
Integer result = 0;
for (Map.Entry<String, Integer> entry : concurrentMap.entrySet()) {
result += entry.getValue();
}
concurrentMap.put("result", result);
}
}
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset方法重置。所以CyclicBarrier可以處理更加複雜的業務場景。例如,如果計算髮生錯誤,可以重置計數器,並讓執行緒重新執行一次。
3.控制併發執行緒數的Semaphore
Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒以保證合理的使用公共資源。Semaphore可以用於做流量控制,特別是公共資源有限的應用場景,比如資料庫的連線。
例項程式碼:
public class SemaphoreTest {
private Executor executors = Executors.newFixedThreadPool(20);
private Semaphore semaphore = new Semaphore(5);
private void linkDataBase() {
for (int i = 0; i < 20; i++) {
executors.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("link data base");
semaphore.release();
}
});
}
}
public static void main(String[] args) {
SemaphoreTest semaphoreTest = new SemaphoreTest();
semaphoreTest.linkDataBase();
}
}
雖然有20個執行緒在執行,但是隻允許5個併發執行。Semaphore的用法非常的簡答,首先執行緒使用Semaphore的acquire()方法獲得一個許可證,使用完之後呼叫release()方法歸還許可證,還可以使用tryAcquire()方法嘗試獲取許可證。Semaphore還提供了一些其他的方法:
4.執行緒間交換資料的Exchanger
Exchanger是一個用於執行緒間協作的工具類。Exchanger用於進行執行緒間的資料交換。它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchanger()方法交換資料,如果一個執行緒先執行exchanger()方法交換資料,它會一直等待第二個執行緒也執行exchanger方法,當兩個執行緒到達同步點的時候,兩個執行緒可以將本執行緒生產出來的資料傳遞給對方,交換資料。
public class ExchangerTest {
private Executor executor = Executors.newFixedThreadPool(2);
private Exchanger exchanger = new Exchanger();
private void exchangeData() {
executor.execute(new Runnable() {
@Override
public void run() {
String A = "A";
try {
String B = (String) exchanger.exchange(A);
System.out.println("B執行緒的值:" + B);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
executor.execute(new Runnable() {
@Override
public void run() {
String B = "B";
try {
String A = (String) exchanger.exchange(B);
System.out.println("A執行緒的值:" + A);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) {
ExchangerTest exchangerTest = new ExchangerTest();
exchangerTest.exchangeData();
}
}
如果兩個執行緒有一個沒有執行exchange()方法,則會一直等待,為了避免一直等待,可以使用exchange(V x,long timeout,TimeUnit unit)設定最大等待時長。