Java併發——Java中的併發類工具
在JDK的併發包裡面提供了幾個非常有用的工具類:CountDwonLatch、CyclicBarrier、Semaphore、Exchanger。
其中CountDwonLatch、CyclicBarrier、Semaphore工具類提供了一種併發流程控制的手段,Exchanger提供了一種線上程間交換資料的手段。
一、四種併發工具類
1、CountDwonLatch(閉鎖)
CountDwonLatch用來等待一個或者多個執行緒完成操作,作用類似於當前執行緒裡呼叫join()方法,讓當前執行緒等待join()進來的執行緒執行完畢再執行當前執行緒剩下的邏輯,但是CountDownLatch比join()的功能更加強大,使用方法如下:
import java.util.concurrent.CountDownLatch; class test{ static CountDownLatch cdl = new CountDownLatch(2); //①新建一個CountDwonLatch物件並傳入計數器的值 public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { try{ System.out.println("thread 1: 1 "); Thread.sleep(1000); System.out.println("thread 1 : 2"); cdl.countDown(); //②在被等待的執行緒或者步驟執行完畢後呼叫countDwon()方法讓計數器減1 } catch(InterruptedException e){ e.printStackTrace(); } } }).start(); new Thread(new Runnable(){ @Override public void run(){ try{ System.out.println("thread 2 : 1"); Thread.sleep(1000); System.out.println("thread 2 : 2"); cdl.countDown(); } catch(InterruptedException e){ e.printStackTrace(); } } }).start(); cdl.await(); //③在等待的主執行緒中呼叫await()方法等待其他執行緒,知道計數器為0 再執行主執行緒接下來的邏輯 System.out.println("thread : main"); } }
輸出結果:
thread 1: 1
thread 2 : 1
thread 1 : 2
thread 2 : 2
thread : main
CountDwonLatch的使用方法:
在CountDwonLatch中,countDwon()方法和await()方法搭配使用才能起到類似join()的作用:
(1)首先建立一個CountDwonLatch物件並傳入要等待的執行緒的數量,這是個計數器;
(2)在被等待的執行緒或者步驟執行完畢後呼叫countDwon()方法讓計數器減1,countDwon()方法是一個等待的計數器,每次呼叫countDwon()方法,計數器減1,直到計數器為0,countDwon可以用在任何地方,可以是一個步驟的一個點,也可以是一個執行緒。
(3)在等待其他執行緒的主執行緒中,呼叫await()方法來等待其他呼叫了countDwon()的執行緒,直到計數器為0,再執行該執行緒接下來的邏輯,當然,如果某個執行緒執行的時間過久,當前執行緒不可能一直等待,那麼可以呼叫await(long time, TimeUnit unit)方法。
*注意:計數器大於0時才會阻塞當前執行緒,一旦計數器等於0就不會再阻塞呼叫await()的當前執行緒;
在建立CountDwonLatch時傳入計數器的初始值後,計數器就不能重新初始化了;
2、CyclicBarrier(同步屏障)
讓一組執行緒到達一個屏障(或者同步點)時被阻塞,知道所有執行緒到達同步屏障後,同步屏障開門,所有執行緒繼續執行。
同步屏障的使用:同步屏障有兩種使用方式,從CyclicBarrier的構造方式撒謊個可以看出來
//構造器 1
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//構造器 2
public CyclicBarrier(int parties) {
this(parties, null);
}
構造器1:傳入要阻塞的執行緒的數量parties和一個Runnable的物件,這個物件的作用是用於處理以下複雜的業務場景情形:當需要在第一個執行緒到達屏障前,前處理一個任務,這個任務可以寫在barrierAction中。
構造器2:runnable物件為null說明只需要簡單的等待其他執行緒到達同步屏障即可。
接下來舉例說明兩種同步屏障的使用:
構造器2:
(1)建立一個CyclicBarrier物件,傳入要阻塞在同步屏障的執行緒數量;
(2)在每個要阻塞在同步屏障的子執行緒中呼叫cb.await( )方法;
import java.util.concurrent.CyclicBarrier;
class test{
static CyclicBarrier cb = new CyclicBarrier(2); //建立一個CyclicBarrier物件,傳入要阻塞在同步屏障的執行緒數量;
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try{
cb.await(); //在每個要阻塞在同步屏障的子執行緒中呼叫cb.await( )方法;
System.out.println(1);
}
catch(Exception e){
System.out.println("thread 1");
}
}
}).start();
try{
cb.await(); //在每個要阻塞在同步屏障的子執行緒中呼叫cb.await( )方法;
System.out.println(3);
}
catch(Exception e){
System.out.println("main");
}
}
}
構造器1:
(1)建立一個CyclicBarrier物件,傳入要阻塞在同步屏障的執行緒數量,和barrierAction物件;
(2)寫一個實現Runnable介面的class,用於實現第一個執行緒到達同步屏障前的業務邏輯;
(3)在每個要阻塞在同步屏障的子執行緒中呼叫cb.await( )方法;
import java.util.concurrent.CyclicBarrier;
class test{
static CyclicBarrier cb = new CyclicBarrier(2,new DoSomeThing()); //建立一個CyclicBarrier物件,傳入要阻塞在同步屏障的執行緒數量,和barrierAction物件;
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try{
cb.await(); // 在每個要阻塞在同步屏障的子執行緒中呼叫cb.await( )方法;
System.out.println(1);
}
catch(Exception e){
System.out.println("thread 1");
}
}
}).start();
try{
cb.await();
System.out.println(2);
}
catch(Exception e){
System.out.println("main");
}
}
public static class DoSomeThing implements Runnable{ //寫一個實現Runnable介面的class,用於實現第一個執行緒到達同步屏障前的業務邏輯;
@Override
public void run(){
System.out.println("happen-before CyclicBarrier");
}
}
}
*注意:
因為cb.await( )方法會丟擲InterruptedException和BrokenBarrierException異常,因此在子執行緒中要使用try-catch方法來捕捉這兩種異常;
CyclicBarrier中的執行緒計數器可以使用reset()方法重置;
在所有執行緒到達同步屏障後,並不是所有執行緒“同時”開始執行,而是使各個執行緒的啟動時間降到最低;
3、Semaphore(訊號量)
Semaphore用於流量控制,用於控制訪問特定資源的執行緒數量,通過協調各個執行緒,以保證合理的使用公共資源。Semaphore就像公路上的交通訊號燈或者收費站,用於控制車的流量,如果一條公路只允許10 個車通行,相當於在路口設定一個頒發通行證的收費站,一輛車只有收到了通行證才能進入這條公路,如果通行證頒發完畢了,那麼其他的車就先等在收費站外面,如果公路上的車出了這條公路,那麼他把通行證還給收費站,收費站又可以繼續把通行證頒發給等在收費站外面的車。
Semaphore的使用方法:
(1)建立一個Semaphore物件,並傳入一個int型別的引數,初始化通行證數量;
(2)在要佔用公共資源的子執行緒業務邏輯前,呼叫s.acquire( )方法獲得通行證,在實現業務邏輯後,呼叫release()方法釋放通行證;
import java.util.concurrent.Semaphore;
class test{
static Semaphore s = new Semaphore(5); //建立一個Semaphore物件,並傳入一個int型別的引數,初始化通行證數量;
public static void main(String[] args) throws InterruptedException {
for(int i = 0; i<10; i++){
new Thread(new Runnable() {
@Override
public void run() {
try{
s.acquire(); //在要佔用公共資源的子執行緒業務邏輯前,呼叫s.acquire( )方法獲得通行證
System.out.println(Thread.currentThread() + "is saving data " + "availablePermits:" + s.availablePermits() + " getQueueLength:" + s.getQueueLength());
Thread.sleep(1000);
s.release(); //在實現業務邏輯後,呼叫release()方法釋放通行證;
}
catch(Exception e){
System.out.println("thread 1");
}
}
}).start();
}
}
}
輸出結果:
Thread[Thread-3,5,main]is saving data availablePermits:4 getQueueLength:0
Thread[Thread-7,5,main]is saving data availablePermits:3 getQueueLength:0
Thread[Thread-0,5,main]is saving data availablePermits:2 getQueueLength:0
Thread[Thread-4,5,main]is saving data availablePermits:1 getQueueLength:0
Thread[Thread-8,5,main]is saving data availablePermits:0 getQueueLength:0
Thread[Thread-2,5,main]is saving data availablePermits:0 getQueueLength:4
Thread[Thread-6,5,main]is saving data availablePermits:0 getQueueLength:3
Thread[Thread-1,5,main]is saving data availablePermits:0 getQueueLength:2
Thread[Thread-5,5,main]is saving data availablePermits:0 getQueueLength:1
Thread[Thread-9,5,main]is saving data availablePermits:0 getQueueLength:0
可以發現,我們在程式碼中,有10個執行緒在執行,但是隻有5個執行緒併發的執行,剩餘的執行緒都在等待佇列中。
Semaphore還有一些其他的方法:
boolean tryAcquire()嘗試獲取通行證;
int getQueueLength( )獲取等待佇列(執行緒)的長度;
int availablePermits()獲取剩餘可用的通行證數量;
boolean QueuedThreads( )是否有執行緒在等待獲取通行證;
void reducePermits(int reduction)減少通行證數量,這是一個protedcted方法;
Cillection getQueuedThreads( )獲取等待獲取通行證的執行緒集合,這是個protected方法;
4、Exchanger(交換者)
Exchanger是一個執行緒間提供資料交換功能的寫作工具,他提供了一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。執行緒間通過呼叫excahange()方法交換資料,如果第一個執行緒先到達同步點,執行exchange方法,那麼他會一直在同步點等待第二個執行緒到達同步點,第二個執行緒也執行exchange方法,這時兩個執行緒都到達同步點,可以交換彼此的資料。
使用:
(1)建立一個Exchanger物件;
(2)在要交換(同步)資料的同步點呼叫excr.exchange( )方法
import java.util.concurrent.Exchanger;
class test{
static Exchanger<String> exc = new Exchanger<>(); //建立一個Exchanger物件;
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try{
String A= "銀行流水A";
String B = exc.exchange(A); //在要交換(同步)資料的同步點呼叫excr.exchange( )方法
System.out.println("A的視角: A、B流水是否一致:" + A.equals(B) + " A錄入的是:" + A + " B錄入的是:" + B);
}
catch(Exception e){
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try{
String B= "銀行流水B";
String A = exc.exchange(B); //在要交換(同步)資料的同步點呼叫excr.exchange( )方法
System.out.println("B的視角: A、B流水是否一致:" + A.equals(B) + " A錄入的是:" + A + " B錄入的是:" + B);
}
catch(Exception e){
e.printStackTrace();
}
}
}).start();
}
}
輸出:
B的視角: A、B流水是否一致:false A錄入的是:銀行流水A B錄入的是:銀行流水B
A的視角: A、B流水是否一致:false A錄入的是:銀行流水A B錄入的是:銀行流水B
可以看出,Exchanger的“交換”更偏向於資料的同步與共享,而不是“你的給我,我的給你”這樣有來有回的交換,是“你知道一個資訊,我知道另一個訊息,我們彼此交換了資訊,那麼我們就都知道了兩個訊息”。如果不願意在同步點一直等待另一個執行緒,那麼可以用設定等待時間的exchange方法:excr.exchange(V x, long timeout, timeUnit unit)。
*注意:
Exchanger交換資料是成對的交換;
Exchanger可以看做雙向的同步佇列,一個執行緒從個佇列頭部進行操作,一個從個佇列尾部進行操作;
二、併發工具類的使用場景
1、CountDwonLatch:一個執行緒等到其它幾個執行緒執行完才能執行的場景;
2、CyclicBarrier:需要多執行緒的計算結果最後對這些結果進行合併的場景;
3、Semaphore:公共資源有限而併發執行緒較多的場景;
4、Exchanger:需要資料交換共享的場景,例如遺傳演算法中,需要選擇兩個人來交配,交換兩人的資料並根據交換規則來得到交配結果,再例如用於校對工作,交換兩個執行緒的資料,用於校對兩個執行緒的資料是否相等;
三、併發工具類CountDwonLatch與CyclicBarrier的區別
1、CountDwonLatch阻塞一個執行緒,CyclicBarrier阻塞多個執行緒;
2、CountDwonLatch的計數器在構造物件時確定了就不能更改,但是CyclicBarrier可以使用reset()方法重置計數器;
3、CyclicBarrier的功能更加豐富,比如int getWaitingNumber()方法返回被阻塞在同步屏障的執行緒數,boolean isBroken()方法返回是否有阻塞的執行緒被中斷