Java併發——Phaser “階段器”
1. Phaser
Phaser是一個更加彈性的同步屏障。類java.util.concurrent實現了Phaser.
這段文字轉自:https://blog.csdn.net/u010739551/article/details/51083004
Phaser表示“階段器”,用來解決控制多個執行緒分階段共同完成任務的情景問題。其作用相比CountDownLatch和CyclicBarrier更加靈活,例如有這樣的一個題目:5個學生一起參加考試,一共有三道題,要求所有學生到齊才能開始考試,全部同學都做完第一題,學生才能繼續做第二題,全部學生做完了第二題,才能做第三題,所有學生都做完的第三題,考試才結束。分析這個題目:這是一個多執行緒(5個學生)分階段問題(考試考試、第一題做完、第二題做完、第三題做完),所以很適合用Phaser解決這個問題。
2. 常用方法
(1)Phaser(int parties)建構函式
與CountDownLatch一樣,傳入同步的執行緒數,也支援層次構造Phaser(Phaser parent)。
(2)register()
動態新增一個或多個參與者,同時返回phase值作抵達分類用。
(3)bulkRegister(int parties)
將指定數目的參與者註冊到phaser中,所有這些新的參與者都將被當成沒有執行完本階段的執行緒。
(4)int arriveAndAwaitAdvance()
類似await()方法,記錄到達執行緒數,阻塞等待其他執行緒到達同步點後再繼續執行。
(5)arriveAndDeregister()
動態撤銷執行緒在phaser的註冊,通知phaser物件,該執行緒已經結束該階段且不參與後面階段。由此減少了未來phase上需要前進的執行緒數量。
(6)arrive()
通知phaser該執行緒已經完成該階段,但不等待其他執行緒。必須小心使用這個方法,因為它不會與其他執行緒同步。
(7)forceTermination()
強制phaser進入終止狀態,不管是否存在未註冊的參與執行緒,當一個執行緒出現錯誤時,強制終止phaser是很有意義的。
(8)boolean onAdvance(int phase, int registeredParties)
當每一個階段執行完畢,此方法會被自動呼叫,因此,過載此方法寫入的程式碼會在每個階段執行完畢時執行,相當於CyclicBarrier的barrierAction。
當此方法返回true時,意味著Phaser被終止,因此可以巧妙的設定此方法的返回值來終止所有執行緒。
(9)在Phaser內有2個重要狀態,分別是phase和party。
phase就是階段,初值為0,當所有的執行緒執行完本輪任務,同時開始下一輪任務時,意味著當前階段已結束,進入到下一階段,phase的值自動加1。
party就是執行緒, party=4就意味著Phaser物件當前管理著4個執行緒。
phaser有一個重大特性,就是不必對它的方法進行異常處理。置於休眠的執行緒不會響應中斷事件,不會丟擲interruptedException異常, 只有一個方法會響應:AwaitAdvanceInterruptibly(int phaser).
3.示例
5個學生一起參加考試,一共有三道題,要求所有學生到齊才能開始考試,全部同學都做完第一題,學生才能繼續做第二題,全部學生做完了第二題,才能做第三題,所有學生都做完的第三題,考試才結束。分析這個題目:這是一個多執行緒(5個學生)分階段問題(考試考試、第一題做完、第二題做完、第三題做完),所以很適合用Phaser解決這個問題。
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class PhaserDemo {
public static void main(String[] args)
{
MyPhaser phaser = new MyPhaser();
StudentTask[] tasks = new StudentTask[5];
for (int i = 0; i < tasks.length; i++) {
tasks[i] = new StudentTask(phaser);
phaser.register(); //註冊一次表示phaser維護的執行緒個數
}
Thread[] threads = new Thread[tasks.length];
for (int i = 0; i < tasks.length; i++) {
threads[i] = new Thread(tasks[i], "Student "+i);
threads[i].start();
}
//等待所有執行緒執行結束
for (int i = 0; i < tasks.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Phaser has finished:"+phaser.isTerminated());
}
}
class StudentTask implements Runnable{
private Phaser phaser;
public StudentTask(Phaser phaser) {
this.phaser = phaser;
}
Random random = new Random();
@Override
public void run()
{
System.out.println(Thread.currentThread().getName()+"已做好準備");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"開始做第1題");
int time1 = doExercise1();
System.out.println(Thread.currentThread().getName()+"完成第1題,做題時間:" + time1);
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"開始做第2題");
int time2 = doExercise2();
System.out.println(Thread.currentThread().getName()+"完成第2題,做題時間:" + time2);
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"開始做第3題");
int time3 = doExercise3();
System.out.println(Thread.currentThread().getName()+"完成第3題,做題時間:" + time3);
phaser.arriveAndAwaitAdvance();
}
private int doExercise1() {
int time1 = random.nextInt(100);
try {
TimeUnit.SECONDS.sleep(time1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return time1;
}
private int doExercise2() {
int time2 = random.nextInt(100);
try {
TimeUnit.SECONDS.sleep(time2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return time2;
}
private int doExercise3() {
int time3 = random.nextInt(100);
try {
TimeUnit.SECONDS.sleep(time3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return time3;
}
}
class MyPhaser extends Phaser{
@Override
protected boolean onAdvance(int phase, int registeredParties)
{
switch(phase) {
case 0:
return studentArrived();
case 1:
return finishFirstExercise();
case 2:
return finishSecondExercise();
case 3:
return finishExam();
default:
return true;
}
}
private boolean studentArrived() {
System.out.println("全部考生都已做好準備,考生人數:" + getRegisteredParties());
return false;
}
private boolean finishFirstExercise() {
System.out.println("所有考生都已完成第1題");
return false;
}
private boolean finishSecondExercise() {
System.out.println("所有考生都已完成第2題");
return false;
}
private boolean finishExam() {
System.out.println("所有考生都已完成第3題,結束考試!");
return true;
}
}
執行結果:
Student 1已做好準備
Student 4已做好準備
Student 2已做好準備
Student 3已做好準備
Student 0已做好準備
全部考生都已做好準備,考生人數:5
Student 0開始做第1題
Student 4開始做第1題
Student 3開始做第1題
Student 1開始做第1題
Student 2開始做第1題
Student 2完成第1題,做題時間:32
Student 0完成第1題,做題時間:59
Student 3完成第1題,做題時間:60
Student 1完成第1題,做題時間:69
Student 4完成第1題,做題時間:96
所有考生都已完成第1題
Student 0開始做第2題
Student 1開始做第2題
Student 4開始做第2題
Student 2開始做第2題
Student 3開始做第2題
Student 2完成第2題,做題時間:8
Student 0完成第2題,做題時間:38
Student 3完成第2題,做題時間:69
Student 1完成第2題,做題時間:73
Student 4完成第2題,做題時間:87
所有考生都已完成第2題
Student 1開始做第3題
Student 3開始做第3題
Student 2開始做第3題
Student 0開始做第3題
Student 4開始做第3題
Student 2完成第3題,做題時間:23
Student 1完成第3題,做題時間:25
Student 0完成第3題,做題時間:25
Student 4完成第3題,做題時間:37
Student 3完成第3題,做題時間:57
所有考生都已完成第3題,結束考試!
Phaser has finished:true