Java併發程式設計的藝術之八----java中的併發工具類
1.等待多執行緒完成的countDownLatch
CountDownLatch允許一個或多個執行緒等待其他執行緒完成操作。
執行緒中,讓一個執行緒等待最簡單的做法是使用join方法,執行緒A中呼叫B.join方法,說明讓執行緒A等待執行緒B完成之後再執行。
實現原理:不停檢查執行緒是否存活,如果join執行緒存活則讓當前執行緒永遠等待。Wait(0)表示永遠等待下去
直到join執行緒終止後,執行緒的this.notifyAll方法被呼叫
CountDownLatch接受一個int型別的引數作為計數器,如果你想等待n個點完成,這裡就傳入n。呼叫countDownLatch的countDown方法時,n就會減1,如果計數器大於0,await方法等待,如果計數器等於零,await方法不等待。countDown方法可以用在任何地方,可以是n個執行緒,也可以1個執行緒裡n個步驟。
ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(3); for(int Runnable runnable = new Runnable(){ @Override public void run() { // TODO Auto-generated method stub try { System.out.println("執行緒" + Thread.currentThread().getName() + "正在準備接受命令"); cdOrder.await(); System.out.println("執行緒" + Thread.currentThread().getName() + "已接收命令"); Thread.sleep((long)Math.random()*1000); System.out.println("執行緒" + Thread.currentThread().getName() + "迴應命令處理結果"); cdAnswer.countDown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; service.execute(runnable); } try{ Thread.sleep((long)Math.random()*1000); System.out.println("執行緒" + Thread.currentThread().getName() + "即將釋出命令"); cdOrder.countDown(); System.out.println("執行緒" + Thread.currentThread().getName() + "已傳送命令,正在等待結果"); cdAnswer.await(); System.out.println("執行緒" + Thread.currentThread().getName() + "已收到所有響應結果"); }catch(Exception e){ e.printStackTrace(); } service.shutdown();
|
2.同步屏障CyclicBarrier
可迴圈使用的屏障,讓一組執行緒到達一個屏障(同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。
假設CyclicBarrier=3,如果阻塞沒有到達三個,那麼await方法就會一直等待,如果到達三個,那麼就不會阻塞。
應用場景:
接受指令,有三個執行緒,需要到達指定地點集合,第一個執行緒先到,呼叫await方法,第二個執行緒也是一樣,等到第三個執行緒到了,最後一個執行緒到達屏障,則await開始執行。
CyclicBarrier和CountDownLatch的區別
CountDownLatch的計數器只能用一次,而CyclicBarrier計數器可以使用reset重置,其實不用手動重置,await到達最後一個的時候,就會自動將計數器置為初始化個數,CyclicBarrier能處理更為複雜的業務,
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的執行緒數量。isBroken()方法用來了解阻塞的執行緒是否被中斷
3.控制併發執行緒數的semaphore
控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源
* semaphore實現的功能就類似廁所有5個坑,假如有十個人要上廁所
* ,那麼同事能有多少個人去上廁所呢,同時只有5個人能夠佔用,當5個人
* 中的任何一個人讓開後,其中在等待的另外5個人中又有一個人可以佔用了
*
* 另外等待的5個人中可以隨機獲得優先機會,可以是按照先來後到的順序獲得機會
* ,這取決於構造semaphore物件時傳入的引數選項
* final Semaphore sp = new Semaphore(3, true);true表示按照先來後來順序
*
*單個訊號量的semaphore物件可以實現互斥鎖的功能,並且可以由一個執行緒
*獲得鎖,另一個執行緒釋放鎖,應用於死鎖回覆的場合
public class SemaphoreTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore sp = new Semaphore(3); for(int i = 0; i < 10; i++){ Runnable runnable = new Runnable(){ public void run(){ try{ sp.acquire(); }catch(InterruptedException e1){ e1.printStackTrace(); } System.out.println("執行緒" + Thread.currentThread().getName() + "進入,當前已有" + (3 - sp.availablePermits()) + "併發"); try{ Thread.sleep((long)(Math.random() * 1000)); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println("執行緒" + Thread.currentThread().getName() + "即將離開"); sp.release(); //下面程式碼有時候執行不準確,因為其沒有和上面程式碼合成原子步驟 System.out.println("執行緒" + Thread.currentThread().getName() + "已離開,當前已有" + (3-sp.availablePermits()) + "併發"); } }; service.execute(runnable); } } } |
首先執行緒使用Semaphore的acquire()方法獲取一個許可證,如果semaphore小於0,就阻塞使用完之後呼叫release()方法歸還許可證。
4.執行緒間交換資料的Exchanger
它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchange方法交換資料,如果第一個執行緒先執行exchange()方法,它會一直等待第二個執行緒也執行exchange方法,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料,將本執行緒生產出來的資料傳遞給對方
public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); service.execute(new Runnable(){ @Override public void run() { // TODO Auto-generated method stub try{ String data1 = "zxx"; System.out.println("執行緒" + Thread.currentThread().getName() + "正在把資料" + data1 + "換出去"); Thread.sleep((long) (Math.random()*1000)); String data2 = (String) exchanger.exchange(data1); System.out.println("執行緒" + Thread.currentThread().getName() + "換回的資料為" + data2); }catch(Exception e){ e.printStackTrace(); } } });
service.execute(new Runnable(){ @Override public void run() { // TODO Auto-generated method stub try{ String data1 = "lhm"; System.out.println("執行緒" + Thread.currentThread().getName() + "正在把資料" + data1 + "換出去"); Thread.sleep((long) (Math.random()*1000)); String data2 = (String) exchanger.exchange(data1); System.out.println("執行緒" + Thread.currentThread().getName() + "換回的資料為" + data2); }catch(Exception e){ e.printStackTrace(); } } });
service.shutdown();
} |