同步工具類閉鎖,柵欄,訊號量
概述
同步工具類可以是任何一個類,只要它根據其自身的狀態來協調執行緒的控制流。阻塞佇列可以作為同步工具類,其他型別的同步工具還包括訊號量(Semaphore),柵欄(Latch),閉鎖(Latch).
閉鎖
閉鎖可以延遲執行緒的進度直到到達中止狀態,閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何執行緒能通過,當到達結束狀態時,這扇門會開啟並允許所有的執行緒通過。
CountDownLatch
API:
CountDownLatch的建構函式接受一個int型別的引數作為計數器,如果你想要等待N個點完成,這裡就傳入N。
呼叫countDown()
,await()
方法會阻塞當前執行緒。之後,會釋放所有等待的執行緒,
的所有後續呼叫都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用await()
CyclicBarria
。由於countDown()
方法可以用在任何地方,所以這裡說的N個點可以是N個執行緒,也可以是1個執行緒裡的N個執行步驟。
(long
time,TimeUnit unit),這個方法等待特定時間後,就不會阻塞當前執行緒。await
簡單入門:
public class CountDownLatchTest { private static java.util.concurrent.CountDownLatch c=new java.util.concurrent.CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread( ){ @Override public void run(){ System.out.println(1); c.countDown(); System.out.println(2); c.countDown(); } }.start(); c.await(); System.out.println(3); } } 輸出: 1 2 3
進階用法: 下面給出了兩個類,其中一組 worker 執行緒使用了兩個倒計數鎖存器:
- 第一個類是一個啟動訊號,在 driver 為繼續執行 worker 做好準備之前,它會阻止所有的 worker 繼續執行。
- 第二個類是一個完成訊號,它允許 driver 在完成所有 worker 之前一直等待。
import java.util.concurrent.CountDownLatch; public class Worker implements Runnable{ private final CountDownLatch startSignal; private final CountDownLatch doneSignal; private CountDownLatch startSignal2; Worker(CountDownLatch startSignal,CountDownLatch doneSignal){ this.startSignal=startSignal; this.doneSignal=doneSignal; } public void run(){ try { startSignal.await(); work(); doneSignal.countDown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }public void work(){ } }
public class Driver {
public void main() throws InterruptedException{
CountDownLatch startSignal=new CountDownLatch(1);//開始訊號
CountDownLatch doneSignal=new CountDownLatch(5);//結束訊號
for(int i=0;i<5;i++){
new Thread(
new Worker(startSignal,doneSignal)
).start();
}
doSomethingElse();
startSignal.countDown();//開始所有工作
doSomethingElse();
doneSignal.await();//等待所有的工作結束
}
public void doSomethingElse(){}
}
另一種典型用法是,將一個問題分成 N 個部分,用執行每個部分並讓鎖存器倒計數的 Runnable 來描述每個部分,然後將所有 Runnable 加入到 Executor 佇列。當所有的子部分完成後,協調執行緒就能夠通過 await。(當執行緒必須用這種方法反覆倒計數時,可改為使用CyclicBarrier
。)
public class Worker1 implements Runnable{
private final CountDownLatch doneSignal;
private final int i;
Worker1(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
public void doWork(int i) {
}
}
public class Driver1 {
public void main() throws InterruptedException{
CountDownLatch doneSignal=new CountDownLatch(1);
Executor c=Executors.newCachedThreadPool();
for(int i=0;i<4;i++){//create and start threads
c.execute(new Worker1(doneSignal,i));
}
doneSignal.await();//wait all finish
}
}
FutureTask
為了理解FutureTask,讓我們先來看看Future
Future 表示非同步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。計算完成後只能使用get 方法來獲取結果,如有必要,計算完成前可以阻塞此方法。取消則由cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。
interface ArchiveSearcher { String search(String target); }
class App {
ExecutorService executor = ..
ArchiveSearcher searcher = ...
void showSearch(final String target)
throws InterruptedException {
Future<String> future
= executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) { cleanup(); return; }
}
}
FutureTask
類是Future 的一個實現,Future 可實現Runnable,所以可通過Executor
來執行。例如,可用下列內容替換上面帶有submit 的構造:
FutureTask<String> future =
new FutureTask<String>(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
executor.execute(future);
訊號量
Semaphore
一個計數訊號量。從概念上講,訊號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個
acquire()
,然後再獲取該許可。每個
release()
新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore
只對可用許可的號碼進行計數,並採取相應的行動。
計算訊號量的一種簡化形式是二值訊號量,即初始值為1的Semaphore,二值訊號量可以用做互斥體,並具備不可重入的加鎖語義。
Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的執行緒數目。例如,下面的類使用訊號量控制對內容池的訪問:
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
柵欄(屏障)
CyclicBarrier
CyclicBarrier的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫做同步點)時阻塞,
直到最後一個執行緒到達屏障時,屏障才會開啟,所有被屏障攔截的執行緒才會繼續執行。同時,Barrier在釋放等待執行緒之後可以重用。
CyclicBarrier支援一個可選的Runnable命令
,在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),該命令只在每個屏障點執行一次。若在繼續所有參與執行緒之前更新共享狀態,此屏障操作
很有用。
柵欄與閉鎖的關鍵區別:
1.閉鎖是一次性物件,柵欄可以重用,也可以reset()重置。
2.所有執行緒必須都到達柵欄位置,才能繼續執行。閉鎖用於等待事件,而柵欄用於等待其他執行緒。
示例用法:使用者多執行緒計算資料,最後合併計算結果的場景。
一個Excel儲存了使用者所有的銀行流水,每個Sheet儲存了一個賬戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,先用多執行緒處理每個sheet裡的銀行流水,都執行完了之後,得到每個sheet的日均銀行流水,最後,再用barrierAction用這些執行緒的計算結果,計算整個Excel的日均銀行流水。
程式碼如下:
<span style="color:#333333;">pa<span style="color:#330033;">ckage cn.dachao.thread;
import java.util.Map;
import java.util.concurrent.*;
/**
* Created by dachao on 16-7-28.
*/
public class BankWaterService implements Runnable{
/*
建立4個屏障,處理完之後執行當前類的run方法
*/
private CyclicBarrier c=new CyclicBarrier(4,this);
/*
假設只有4個sheet,所以只啟動4個執行緒
*/
private Executor executor= Executors.newFixedThreadPool(4);
/*
儲存每個sheet計算出的銀流結果
*/
private ConcurrentHashMap<String,Integer> count=new ConcurrentHashMap<>();
private void count(){
for(int i=0;i<4;i++){
executor.execute(new Runnable() {
@Override
public void run() {
//計算當前sheet的銀行資料,程式碼省略
count.put(Thread.currentThread().getName(),1);
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void run(){
int result=0;
//彙總每個sheet計算出的結果
for(Map.Entry<String,Integer> sheet:count.entrySet()){
result+=sheet.getValue();
}
//將結果輸出
count.put("result",result);
System.out.println(result);
}
public static void main(String[]args){
BankWaterService service=new BankWaterService();
service.count();
}
}
輸出結果:
<span style="color:#333333;">4</span>
</span></span>
Exchanger
可以在對中對元素進行配對和交換的執行緒的同步點。每個執行緒將條目上的某個方法呈現給
exchange
方法,與夥伴執行緒進行匹配,並且在返回時接收其夥伴的物件。Exchanger 可能被視為
SynchronousQueue
的雙向形式。Exchanger 可能在應用程式(比如遺傳演算法和管道設計)中很有用。
用法示例:以下是重點介紹的一個類,該類使用 Exchanger
線上程間交換緩衝區,因此,在需要時,填充緩衝區的執行緒獲取一個新騰空的緩衝區,並將填滿的緩衝區傳遞給騰空緩衝區的執行緒。
class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.isFull())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ... }
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}