Java處理多執行緒併發相關類
自從JDK5釋出以來,在java.util.concurrent包中提供了一些非常有用的輔助類來幫助我們進行併發程式設計,下面就介紹一下這些輔助類中的Semaphore、CyclicBarrier、CountDownLatch以及Exchanger的相關用法。
一、Semaphore
Semaphore是計數訊號量,是作業系統中的一個概念,經常用於限制獲取某種資源的執行緒數量,在new 這個類的時候需要給這個類傳遞一個引數permits,這個引數是整數型別,這個引數的意思是同一時間內,最多允許多少個執行緒同時執行acquire方法和release方法之間的程式碼,如果方法acquire沒有引數則預設是一個許可。例如售票視窗,假如有兩個視窗,有三十個使用者需要買票,控制併發進行程式設計,程式碼如下:
public class SemaphoreTest { class SemaphoreRunnable implements Runnable{ private Semaphore semaphore; private int user; public SemaphoreRunnable(Semaphore semaphore, int user) { this.semaphore = semaphore; this.user = user; } @Override public void run() { try { //獲取semaphore並且釋放 semaphore.acquire(); System.out.println("使用者"+user+"進入視窗買票。。。"); Thread.sleep((long)(Math.random()*10000)); System.out.println("使用者"+user+"已經買票。。。"); Thread.sleep((long)(Math.random()*10000)); System.out.println("使用者"+user+"離開視窗。。。"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void execute(){ final Semaphore semaphore = new Semaphore(2); ExecutorService threadPool = Executors.newCachedThreadPool(); for(int i=0; i<20; i++){ threadPool.execute(new SemaphoreRunnable(semaphore, i+1)); } threadPool.shutdown(); } public static void main(String[] args) { SemaphoreTest st = new SemaphoreTest(); st.execute(); } }
充分的模擬了現實售票視窗的售票業務,semaphore.acquire()相當於進入了其中一個視窗,買完票後semaphore.release()相當於買票者離開了售票視窗,這種情況下才允許下一個買票者進入售票視窗。
二、CyclicBarrier
CyclicBarrier直譯過來叫做記憶體屏障,它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續下面的業務。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。與CountDownLatch不同的是該barrier在釋放等待執行緒後可以重用,所以稱它為迴圈(Cyclic)的屏障(Barrier)。業務場景:假設公司突然開竅了,出錢讓程式設計師出去團建,肯定是等所有人員到達指定地點時,才發車,發車之前可能要拍一個集體照,加入就三個程式設計師,業務程式碼如下:
public class CyclicBarrierTest {
public static void main(String[] args) {
final CyclicBarrier cb = new CyclicBarrier(3,new Runnable() {
@Override
public void run() {
System.out.println("人員已經到齊,開始拍照。。。");
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
ExecutorService threadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 3; i++) {
final int coder = i+1;
Runnable r = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println(coder+"到達集合地點,當前已有"+(cb.getNumberWaiting()+1)+"到達。。。");
cb.await();
System.out.println("拍照完畢,開始出發。。。");
Thread.sleep((long)(Math.random()*1000));
System.out.println("到達遊玩地,"+coder+"開始下車。。。");
} catch (InterruptedException|BrokenBarrierException e) {
e.printStackTrace();
}
}
};
threadPool.execute(r);
}
threadPool.shutdown();
}
}
其中cb.await()相當於發出屏障指令,要等所有程式設計師到達目的地之後才進行下面的安排。檢視CyclicBarrier的原始碼,有兩個構造方法:
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier(int parties)中的parties只是指定了需要等待的執行緒數,而CyclicBarrier(int parties, Runnable barrierAction)則是追加了一個執行緒,這個執行緒可以等parties數目執行緒都到達觸發一些業務處理,如上面那個例子當中的人員到達後拍照留念。
三、CountDownLatch
CountDownLatch可以實現類似計數器的功能,計數器的初始值為指定的執行緒的數量,每當一個執行緒完成了自己的任務,計數器的值就會減1,當計數器的值達到了0時,它表示所有的執行緒都完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。構造器上的計數值實際上就是閉鎖需要等待的執行緒數量,這個值只能被設定一次,而且CountDownLatch沒有提供任何機制去重新設定這個值。
業務場景:運動員賽跑,要在同一時刻開始起跑,但是要等最後一個運動員到達終點時才會彙總成績,假如有8個運動員:
public class PlayerTest {
public static void main(String[] args) {
CountDownLatch beginLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(8);
for (int i = 0; i < 8; i++) {
new Thread(new PlayerTest().new Work(i+1, beginLatch, endLatch)).start();
}
try {
System.out.println("準備起跑。。。");
beginLatch.countDown();
endLatch.await();
System.out.println("成績彙總。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
class Work implements Runnable{
private int id;
private CountDownLatch beginLatch;
private CountDownLatch endLatch;
public Work(int id, CountDownLatch beginLatch, CountDownLatch endLatch) {
super();
this.id = id;
this.beginLatch = beginLatch;
this.endLatch = endLatch;
}
@Override
public void run() {
try {
beginLatch.await();
System.out.println("運動員{"+id+"}到達終點。。。");
endLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
四、Exchanger
Exchanger是用於執行緒間協作的工具類,用於執行緒間的資料交換,它提供一個同步點,在這個同步點兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchange方法交換資料, 如果第一個執行緒先執行exchange方法,它會一直等待第二個執行緒也執行exchange,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料,將本執行緒生產出來的資料傳遞給對方。
業務場景:我們經常看一些販毒的警匪片,最經典的臺詞就是“一手交錢,一手交貨”,程式碼如下:
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
final Exchanger<String> ex = new Exchanger<>();
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String reStr = ex.exchange("一手交貨");
System.out.println("吸毒者:"+reStr);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String reStr = ex.exchange("一手交錢");
System.out.println("販毒者:"+reStr);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.shutdown();
}
}
吸毒者要想獲得毒品,就要換取“一手交錢”的資訊,販毒者要想獲得金錢,就要換取“一手交貨”的資訊,當雙方到達一個小黑屋的同步點時,就可以交錢收貨和交貨收錢了! 熟悉了各個併發程式設計提供的控制類的介紹和用法,在一些沒有必要使用重量級鎖的業務場景中,就可以應用併發揮它們的作用了。