同步工具類--閉鎖、訊號量、柵欄的總結
閉鎖用於一組執行緒等待(阻塞)一個外部事件的發生,這個事件發生之前這些執行緒阻塞,等待控制執行緒開啟閉鎖,然後這些執行緒同時開始執行。閉鎖強調的是阻塞後的同時開始;柵欄則是一組執行緒相互等待,直到所有執行緒都到達某一點時才打開柵欄,然後執行緒可以繼續執行,也就是說控制執行緒先設定一個時間點,然後這些執行緒各自執行,執行完等待(阻塞),直到這組執行緒中的所有執行緒執行完,然後控制執行緒柵欄開啟,這些執行緒同時繼續執行。柵欄強調的是各自執行完後的相互等待以及繼續執行。訊號量根據一個計數器控制一個結果的數量,條件滿足情況下才能進行增加和移除操作,否則進行操作的執行緒阻塞。
工具 |
作用 |
主要方法 |
|
閉鎖 (CountDownLatch) |
類似於門。門初始是關閉的,試圖進門的執行緒掛起等待開門。當負責開門程序將門開啟後,所有等待執行緒被喚醒。 門一旦開啟就不能再關閉了。 |
CountDownLatch(int n):指定閉鎖計數器 await() :掛起等待閉鎖計數器為0 countDown():閉鎖計數器減1 |
|
柵欄 (CyclicBarrier) |
和閉鎖有類似之處。閉鎖是等待“開門”事件;柵欄是等待其他執行緒。例如有N個執行緒檢視通過柵欄,此時先到的要等待,直到所有執行緒到到達後,柵欄開啟,所有等待執行緒被喚醒通過柵欄。 |
CyclicBarrier(int n):需要等待的執行緒數量 await():掛起等待達到執行緒數量 |
|
訊號量 (Semaphore) |
和鎖的作用類似。區別是鎖只允許被一個執行緒獲取,但是訊號量可以設定資源數量。當沒有可用資源時,才被掛起等待。 |
Semaphore(int n):指定初始的資源數量 acquire():試圖獲取資源。當沒有可用資源時掛起 release():釋放一個資源 |
場景對比:
l 閉鎖場景:幾個人相約去公園遊玩,在家做好準備,約定在某一時刻同時出發去公園,準備工作進行的快的不能提前出門,到點出門。
l 柵欄場景:幾個人相約去公園遊玩,幾個人去到公園門口,要等全部到達公園門口後才一起進入公園。
l
CountDownLatch:
public class TestHarness{
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{
final CountDownLatch startGate = new CountDownLatch(1);//全部執行緒開始的閉鎖
final CountDownLatch endGate = new CountDownLatch(nThread);//全部執行緒結束的閉鎖
for(int i = 0 ; i < nThreads ; i++){
Thread t = new Thread(){
public void run(){
try{
startGate.await();//所有的執行緒都在這裡等待閉鎖開啟
try{
task.run();//實際要呼叫的方法
}finally{
endGate.countDown(); //每一個執行緒執行完畢,呼叫countDown()方法,直到全部的執行緒(nThreads個)執行完畢,閉鎖開啟
}
}catch(InterruptedException e){}
}
};
t.start(); //t.start()不會立即開始執行task.run()方法,執行緒一開始執行startGate.await();會等待閉鎖startGate開啟
}
long start = System.nanoTime();//記錄開始時間
startGate.countDown();//閉鎖startGate初始化為1,執行一次countDown()方法,閉鎖就打開了,所有的執行緒可以繼續執行
endGate.await();//當前執行緒在這裡等待所有的執行緒執行完畢
long end = System.nonaTIme();//記錄結束時間
return end - start;
}
}
semaphore :
public class BoundedHashSet<T>{
private final Set<T> set;
private final Semaphore sem;
//構造方法,初始化邊界
public BoundedHashSet(int bound){
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);//初始化許可集的大小,即該有界阻塞佇列的大小
}
public boolean add(T o) throws InterruptedException{
sem.acquire();//新增元素時獲取一個許可,如果剩餘許可為0,則說明此有界阻塞佇列已滿
boolean wasAdded = false; //返回值,預設為新增元素失敗。
try{
wasAdded=set.add(o);//向佇列中新增元素,如果成功,則返回成功
return wasAdded;
}finally{
if(!wasAdded) sem.release(); //如果新增失敗,把方法一開始獲取的許可釋放掉
}
}
public boolean remove (Object o){
boolean wasRemoved = set.remove(o);
if(wasRemoved) sem.release();//如果刪除成功,則釋放一個許可
return wasRemoved;
}
}
barrier:
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow); //每一個執行緒負責處理一行資料
try {
barrier.await(); //處理完行資料之後在這個關卡等待其他執行緒,當所有的執行緒都處理完畢,才繼續向下執行
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
mergeRows(...); //彙總計算的結果語句
}//關卡CyclicBarrier 初始化時引數一為:關卡需要等待的執行緒的數量n,只有當n個執行緒全部到達此關卡,才能順利通過;引數二為:一個關卡行為,當關卡順利通過之後,在一個子任務執行緒中執行這個行為。Runnable型別~
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start(); //每一行一個執行緒處理,分別啟動
waitUntilDone();
}
}