Java執行緒--Phaser階段器
Phaser階段器
目錄
Phaser概念
Phaser表示“階段器”,用來解決控制多個執行緒分階段共同完成大型任務的情景問題。也就是說,事情一趟流程走下來,可以分為若干個階段,每個階段的工作都需要註冊該階段的N個人來幹。每人幹自己的活,手腳快的呢,就幹好到達第一階段的屏障前等待手腳慢的人幹完活後也到達第一階段屏障,...直至註冊該階段的N個人都各自幹完了一階段期間內的活,都到達了第一階段屏障,第一階段屏障開啟,准許這N個人通行,繼續進行下一階段...周而復始...直至所有階段都幹完,結束。
其作用相比CountDownLatch和CyclicBarrier更加靈活。
Phaser內有2個重要狀態:phase和party,有一個重要方法:onAdvance()
- phase階段,初值為0,當所有的執行緒執行完本輪任務,phase值自動加1,phase放行,執行緒可進入到下一階段。
- party註冊執行緒,party=3就意味著要在該階段註冊3個執行緒,Phaser物件當前管理著3個執行緒。
- boolean onAdvance(int phase, int registeredParties)方法,經常需要被過載,此方法有2個作用:
當每一個階段執行完畢,此方法會被呼叫,相當於CyclicBarrier的barrierAction。
當此方法返回true時,意味著Phaser被終止,因此過載此方法時將返回值是否設定為true來決定是否終止所有執行緒。
下面舉例業務場景:
倉庫來了一貨車的貨物要入庫,貨物很多,需要N個人(多執行緒)來幹,幹活有快有慢的,手腳快的人多勞多得
第一階段:註冊了3人,每人都從車上卸下貨物,3人將所有的貨物都卸貨完畢後,得以通過第一階段屏障,進入下一階段
第二階段:註冊了3人,每人都把貨物碼放托盤,3人將所有的貨物都碼託完畢後,得以通過第二階段屏障,進入下一階段
第三階段:註冊了3人,每人都把碼盤入倉存放,3人將所有的貨物都入倉完畢後,得以通過第三階段屏障,進入下一階段
結束
Phaser示例1
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
//任務
class TaskWms implements Runnable
{
private Phaser phase;
public TaskWms(Phaser phase){
this.phase = phase ;
}
public void run(){
/**
* 初始
*/
String tName = Thread.currentThread().getName();
/**
* 第0段:卸貨
*/
try{
long time = (long)(Math.random() * 5000);
Thread.sleep(time);
System.out.println(tName+"卸貨】耗時is:["+time+"]");
}catch(InterruptedException e){}
phase.arriveAndAwaitAdvance();//每個執行緒都到達後,通過當前階段,呼叫onAdvance()方法.
/**
* 第1段:碼託
*/
try{
long time = (long)(Math.random() * 5000);
Thread.sleep(time);
System.out.println(tName+"碼託】耗時is:["+time+"]");
}catch(InterruptedException e){}
phase.arriveAndAwaitAdvance();
/**
* 第2段:入庫
*/
try{
long time = (long)(Math.random() * 5000);
Thread.sleep(time);
System.out.println(tName+"入庫】耗時is:["+time+"]");
}catch(InterruptedException e){}
phase.arriveAndAwaitAdvance();
/**
* 結束:結賬領工錢
*/
System.out.println(tName+"結賬領工錢...");
phase.arriveAndDeregister();
}
}
/**
* 自定義階段器:過載onAdvance()方法:可表達每階段結束後的響應
* 注意Phaser階段器是從零開始的,
* 首次呼叫onAdvance()時,剛剛越過的階段屏障的值為0
*/
class MyPhaser extends Phaser
{
public boolean onAdvance(int p, int party){
switch(p){
case 0:
System.out.println("...卸貨完畢!\n\r");
return false;
case 1:
System.out.println("...碼託完畢!\n\r");
return false;
case 2:
System.out.println("...入庫完畢!\n\r");
return false;
case 3:
System.out.println("\n\r大家都已記賬結算工資!\n\r");
return true;
default:
return true;
}
}
}
/**
* 測試
*/
public class PhaserTest {
public static void main(String[] args) {
MyPhaser phase = new MyPhaser();
Thread[] threads = new Thread[3];
for (int i = 0; i < 3; i++) {
TaskWms task = new TaskWms(phase);
phase.register();
threads[i] = new Thread(task, "T"+i);
threads[i].start();
}
for (int i = 0; i < 3; i++) {
try {
threads[i].join();
}
catch (InterruptedException e) {
}
}
System.out.println("階段完畢:OK? Phaser has finished : "+phase.isTerminated());
}
}
程式結果如下:
T1卸貨】耗時is:[358]
T2卸貨】耗時is:[448]
T0卸貨】耗時is:[625]
...卸貨完畢!
T0碼託】耗時is:[38]
T1碼託】耗時is:[1785]
T2碼託】耗時is:[4536]
...碼託完畢!
T1入庫】耗時is:[414]
T0入庫】耗時is:[797]
T2入庫】耗時is:[2900]
...入庫完畢!
T2結賬領工錢...
T1結賬領工錢...
T0結賬領工錢...
大家都已記賬結算工資!
階段完畢:OK? Phaser has finished : true
Phaser示例2
此示例是針對上述Phaser示例1的變種:
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
//任務:3人卸貨
class TaskWms0 implements Runnable
{
private Phaser phase;
public TaskWms0(Phaser phase){
this.phase = phase ;
}
public void run(){
String tName = Thread.currentThread().getName();
try{
long time = (long)(Math.random() * 500);
Thread.sleep(time);
System.out.println(tName+"--卸貨】耗時is:["+time+"]");
}catch(InterruptedException e){}
phase.arriveAndDeregister();//每個執行緒都到達後,通過當前階段,呼叫onAdvance()方法,釋放註冊資訊.
}
}
//任務:1人碼託
class TaskWms1 implements Runnable
{
private Phaser phase;
public TaskWms1(Phaser phase){
this.phase = phase ;
}
public void run(){
String tName = Thread.currentThread().getName();
try{
long time = (long)(Math.random() * 500);
Thread.sleep(time);
System.out.println(tName+"--碼託】耗時is:["+time+"]");
}catch(InterruptedException e){}
phase.arriveAndDeregister();
}
}
//任務:2人入庫
class TaskWms2 implements Runnable
{
private Phaser phase;
public TaskWms2(Phaser phase){
this.phase = phase ;
}
public void run(){
String tName = Thread.currentThread().getName();
try{
long time = (long)(Math.random() * 500);
Thread.sleep(time);
System.out.println(tName+"--入庫】耗時is:["+time+"]");
}catch(InterruptedException e){}
phase.arriveAndDeregister();
}
}
/**
* 自定義階段器:過載onAdvance()方法:可表達每階段結束後的響應
* 注意Phaser階段器是從零開始的,
* 首次呼叫onAdvance()時,剛剛越過的階段屏障的值為0
*/
class MyPhaser extends Phaser
{
public MyPhaser(int parties){
super(parties);
}
public MyPhaser(){
super();
}
public boolean onAdvance(int p, int party){
switch(p){
case 0:
System.out.println("階段"+p+":卸貨完畢!\n\r");
return false;
case 1:
System.out.println("階段"+p+":碼託完畢!\n\r");
return false;
case 2:
System.out.println("階段"+p+":入庫完畢!\n\r");
return true;
default:
return true;
}
}
}
/**
* 測試
*/
public class PhaserTest {
public static void main(String[] args) {
MyPhaser phase = new MyPhaser(1);
/**
* 等價於:這麼兩句合在一起的意思
* MyPhaser phase = new MyPhaser();
* phase.register();
*/
/**
* register()註冊幾次,
* 就要出現幾次arrive()或arriveAndAwaitAdvance()或arriveAndDeregister(),
* 才能讓屏障開啟,階段繼續
*/
char c = 'G';
/**
* 卸貨任務3人
*/
String phaseAfterStr = phase.getPhase()==0?"":"階段屏障("+(phase.getPhase()-1)+")之後";
String phaseBeforeStr = "【.任務執行中.】階段屏障("+phase.getPhase()+")之前";
System.out.println(phaseAfterStr+phaseBeforeStr);
Thread[] threads0 = new Thread[3];
for (int i = 0; i < 3; i++) {
TaskWms0 task = new TaskWms0(phase);
phase.register();
threads0[i] = new Thread(task, "卸"+(c++));
threads0[i].start();
}
phase.arriveAndAwaitAdvance(); //設定屏障,卸貨完成,才往下繼續執行
/**
* 碼託任務1人
*/
phaseAfterStr = phase.getPhase()==0?"":"階段屏障("+(phase.getPhase()-1)+")之後";
phaseBeforeStr = "【.任務執行中.】階段屏障("+phase.getPhase()+")之前";
System.out.println(phaseAfterStr+phaseBeforeStr);
Thread[] threads1 = new Thread[1];
for (int i = 0; i < 1; i++) {
TaskWms1 task = new TaskWms1(phase);
phase.register();
threads1[i] = new Thread(task, "碼"+(c++));
threads1[i].start();
}
phase.arriveAndAwaitAdvance(); //設定屏障,碼託完成,才往下繼續執行
/**
* 入庫任務2人
*/
phaseAfterStr = phase.getPhase()==0?"":"階段屏障("+(phase.getPhase()-1)+")之後";
phaseBeforeStr = "【.任務執行中.】階段屏障("+phase.getPhase()+")之前";
System.out.println(phaseAfterStr+phaseBeforeStr);
Thread[] threads2 = new Thread[2];
for (int i = 0; i < 2; i++) {
TaskWms2 task = new TaskWms2(phase);
phase.register();
threads2[i] = new Thread(task, "庫"+(c++));
threads2[i].start();
}
phase.arriveAndAwaitAdvance(); //設定屏障,入庫完成,才往下繼續執行
System.out.println("階段完畢:OK? Phaser has finished : "+phase.isTerminated());
}
}
程式執行結果如下:
【.任務執行中.】階段屏障(0)之前
卸I--卸貨】耗時is:[151]
卸H--卸貨】耗時is:[252]
卸G--卸貨】耗時is:[300]
階段0:卸貨完畢!
階段屏障(0)之後【.任務執行中.】階段屏障(1)之前
碼J--碼託】耗時is:[181]
階段1:碼託完畢!
階段屏障(1)之後【.任務執行中.】階段屏障(2)之前
庫K--入庫】耗時is:[343]
庫L--入庫】耗時is:[361]
階段2:入庫完畢!
階段完畢:OK? Phaser has finished : true
Phaser比CountDownLatch靈活
以下示例:演示一個執行緒等待一組執行緒
/**
* if you want change the number of parties, you should create a new instance.
*/
/**
* 倒計時器 CountDownLatch(int parties)
*
* 初始化必須指定好執行緒個數,且以後無法修改
* 呼叫countDown()方法1次,遞減1次,直至降為0,喚醒呼叫await()方法的所有執行緒
* 是一次性用品,用完就扔,無法再用
* 一個執行緒和一組執行緒之間的通訊:一個等一組:一執行緒呼叫awaitAdvance(0),組執行緒呼叫arrive()
*/
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
public class PhaserTest_5 {
public static void main(String[] args) {
/**
* 類似:CountDownLatch(3)
*/
Phaser phaser = new Phaser(3);
System.out.println("main blocking...!\n\r");
//3個任務
for(int i = 0 ; i < 3 ; i++){
Task_05 task = new Task_05(phaser);
Thread thread = new Thread(task,"T" + i);
thread.start();
}
/**
* 類似:countDownLatch.await()
*/
phaser.awaitAdvance(0);
System.out.println("\n\rmain over OK!");
}
static class Task_05 implements Runnable{
private final Phaser phaser;
Task_05(Phaser phaser){
this.phaser = phaser;
}
public void run() {
try{
String tName = Thread.currentThread().getName();
long time = (long)(Math.random() * 500);
Thread.sleep(time);
System.out.println(tName+"--執行任務】耗時is:["+time+"]");
}catch(Exception e){}
/**
* 類似:countDownLatch.countDown()
*/
phaser.arrive();
}
}
}
程式執行結果如下:
main blocking...!
T1--執行任務】耗時is:[88]
T2--執行任務】耗時is:[106]
T0--執行任務】耗時is:[373]
main over OK!
以下示例:演示一組執行緒等待一個執行緒
/**
* if you want change the number of parties, you should create a new instance.
*/
/**
* 倒計時器 CountDownLatch(int parties)
*
* 初始化必須指定好執行緒個數,且以後無法修改
* 呼叫countDown()方法1次,遞減1次,直至降為0,喚醒呼叫await()方法的所有執行緒
* 是一次性用品,用完就扔,無法再用
* 一個執行緒和一組執行緒之間的通訊:一組等一個:一執行緒呼叫arrive(),組執行緒呼叫awaitAdvance(0)
*/
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
public class PhaserTest_5 {
public static void main(String[] args) {
/**
* 類似:CountDownLatch(1)
*/
Phaser phaser = new Phaser(1);
//3個任務
for(int i = 0 ; i < 3 ; i++){
Task_05 task = new Task_05(phaser);
Thread thread = new Thread(task,"T" + i);
thread.start();
}
try{
long time = (long)(Math.random() * 1000);
Thread.sleep(time);
System.out.println("單一執行緒】耗時is:["+time+"]\n\r");
}catch(Exception e){}
/**
* 類似:countDownLatch.countDown()
*/
phaser.arrive();
}
static class Task_05 implements Runnable{
private final Phaser phaser;
Task_05(Phaser phaser){
this.phaser = phaser;
}
public void run() {
/**
* 類似:countDownLatch.await()
*/
phaser.awaitAdvance(0);
String tName = Thread.currentThread().getName();
System.out.println("單一執行緒已到達,組內執行緒【"+tName+"】--我得以執行任務!");
}
}
}
程式執行結果如下:
單一執行緒】耗時is:[446]
單一執行緒已到達,組內執行緒【T0】--我得以執行任務!
單一執行緒已到達,組內執行緒【T1】--我得以執行任務!
單一執行緒已到達,組內執行緒【T2】--我得以執行任務!
Phaser比CyclicBarrier靈活
CyclicBarrier指定的執行緒參與個數,是初始化時定死的
Phaser既可以初始化的時候指定執行緒參與個數;也可以初始化時不指定,而由後續的register()方法來動態註冊參與執行緒
/**
* if you want change the number of parties, you should create a new instance.
*/
/**
* 迴圈屏障 CyclicBarrier(int parties)
*
* 初始化必須指定好執行緒個數,且以後無法修改,可以理解為一組執行緒,組內執行緒總個數為parties
* 呼叫await()方法1次,遞減1次,直至降為0,喚醒呼叫await()方法的所有執行緒,恢復parties
* 是迴圈用品,可以反覆使用
* 一組執行緒內的執行緒彼此等待
*
* CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
*/
/**
* 程式碼演示:用本文的Phaser示例1改造下,同理:他們可以相互改造
*
* Phaser類的arriveAndAwaitAdvance()方法
* 功效雷同
* CyclicBarrier類的await()方法
*/
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
//任務
class TaskWms implements Runnable
{
private CyclicBarrier barrier;
public TaskWms(CyclicBarrier barrier){
this.barrier = barrier ;
}
public void run(){
/**
* 初始
*/
String tName = Thread.currentThread().getName();
/**
* 第0段:卸貨
*/
try{
long time = (long)(Math.random() * 5000);
Thread.sleep(time);
System.out.println(tName+"卸貨】耗時is:["+time+"]");
barrier.await();//每個執行緒都到達後,通過當前屏障,繼續往下執行.
}catch(Exception e){}
/**
* 第1段:碼託
*/
try{
long time = (long)(Math.random() * 5000);
Thread.sleep(time);
System.out.println(tName+"碼託】耗時is:["+time+"]");
barrier.await();
}catch(Exception e){}
/**
* 第2段:入庫
*/
try{
long time = (long)(Math.random() * 5000);
Thread.sleep(time);
System.out.println(tName+"入庫】耗時is:["+time+"]");
barrier.await();
}catch(Exception e){}
/**
* 結束:結賬領工錢
*/
System.out.println(tName+"結賬領工錢...");
}
}
/**
* 測試
*/
public class PhaserTest {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3);
Thread[] threads = new Thread[3];
for (int i = 0; i < 3; i++) {
TaskWms task = new TaskWms(barrier);
threads[i] = new Thread(task, "T"+i);
threads[i].start();
}
for (int i = 0; i < 3; i++) {
try {
threads[i].join();
}
catch (InterruptedException e) {
}
}
System.out.println("完畢:OK");
}
}
程式執行結果如下:
T1卸貨】耗時is:[1702]
T2卸貨】耗時is:[2234]
T0卸貨】耗時is:[3078]
T1碼託】耗時is:[2159]
T2碼託】耗時is:[3881]
T0碼託】耗時is:[4503]
T0入庫】耗時is:[2541]
T1入庫】耗時is:[3614]
T2入庫】耗時is:[4827]
T2結賬領工錢...
T1結賬領工錢...
T0結賬領工錢...
完畢:OK
Phaser總結
再次引用別人比較好的一篇部落格總結:Phaser總結
在JAVA 1.7引入了一個新的併發API:Phaser,一個可重用的同步barrier。在此前,JAVA已經有CyclicBarrier、CountDownLatch這兩種同步barrier,但是Phaser更加靈活,而且側重於“階段推進”。
一、簡述
1、註冊機制:
與其他barrier不同的是,Phaser中的“註冊的同步者(parties)”會隨時間而變化,Phaser可以通過構造器初始化parties個數,也可以在Phaser執行期間隨時加入(register)新的parties,以及在執行期間登出(deregister)parties。執行時可以隨時加入、登出parties,只會影響Phaser內部的計數器,它建立任何內部的bookkeeping(賬本),因此task任務執行緒不能查詢自己是否已經註冊,當然你可以通過實現子類來達成這一設計要求。
//虛擬碼
/**
* 每個階段成對出現 register() 和 arrive()[或arriveAndAwaitAdvance()][或arriveAndDeregister()]
*/
Phaser phaser = new Phaser();
phaser.register(); //parties count: 1
....
phaser.arriveAndDeregister(): //parties count : 0;
....
/**
* register()方法不是必須的:有簡略的註冊機制,即初始化Phaser時,給出併發執行緒個數parties的值
*/
Phaser phaser = new Phaser(3);
//phaser.register(); //不需要
//phaser.register(); //不需要
//phaser.register(); //不需要
此外,CyclicBarrier、CountDownLatch需要在初始化的建構函式中指定同步者的個數,且執行時無法再次調整。
//虛擬碼
/**
* if you want change the number of parties, you should create a new instance.
*/
/**
* 倒計時器 CountDownLatch(int parties)
*
* 初始化必須指定好執行緒個數,且以後無法修改
* 呼叫countDown()方法1次,遞減1次,直至降為0,喚醒呼叫await()方法的所有執行緒
* 是一次性用品,用完就扔,無法再用
* 一個執行緒和一組執行緒之間的通訊:一個等一組:一執行緒呼叫await(),組執行緒呼叫countDown()
* 一組執行緒和一個執行緒之間的通訊:一組等一個:組執行緒呼叫await(),一執行緒呼叫countDown()
*/
CountDownLatch countDownLatch = new CountDownLatch(12);
/**
* 迴圈屏障 CyclicBarrier(int parties)
*
* 初始化必須指定好執行緒個數,且以後無法修改,可以理解為一組執行緒,組內執行緒總個數為parties
* 呼叫await()方法1次,遞減1次,直至降為0,喚醒呼叫await()方法的所有執行緒,恢復parties
* 是迴圈用品,可以反覆使用
* 一組執行緒內的執行緒彼此等待
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(12);
2、同步機制:
類似於CyclicBarrier,arrivedAndAwaitAdvance()方法的效果類似於CyclicBarrier的await()。Phaser的每個週期(generation)都有一個phase數字,phase 從0開始,當所有的已註冊的parties都到達後(arrive)將會導致此phase數字自增1,階段向前推進。這個phase數字用於表示當前parties所處於的“階段週期”,它既可以標記和控制parties的wait行為、喚醒等待的時機。
2.1)Arrival:Phaser中的arrive()、arriveAndDeregister()方法,這兩個方法不會阻塞(block),但是會返回相應的phase數字,當此phase中最後一個party也arrive以後,phase數字將會增加,即phase進入下一個週期,同時觸發(onAdvance)那些阻塞在上一phase的執行緒。這一點類似於CyclicBarrier的barrier到達機制;更靈活的是,我們可以通過重寫onAdvance方法來實現更多的觸發行為。
2.2)Waiting:Phaser中的awaitAdvance()方法,需要指定一個phase數字,表示此Thread阻塞直到phase推進到此週期,arriveAndAwaitAdvance()方法阻塞到下一週期開始(或者當前phase結束)。不像CyclicBarrier,即使等待Thread已經interrupted,awaitAdvance方法會繼續等待。Phaser提供了Interruptible和Timout的阻塞機制,不過當執行緒Interrupted或者timout之後將會丟擲異常,而不會修改Phaser的內部狀態。如果必要的話,你可以在遇到此類異常時,進行相應的恢復操作,通常是在呼叫forceTermination()方法之後。
Phaser通常在ForJoinPool中執行tasks,它可以在有task阻塞等待advance時,確保其他tasks的充分並行能力。
3、中斷(終止):
Phaser可以進入Termination狀態,可以通過isTermination()方法判斷;當Phaser被終止後,所有的同步方法將會立即返回(解除阻塞),不需要等到advance(即advance也會解除阻塞),且這些阻塞方法將會返回一個負值的phase值(awaitAdvance方法、arriveAndAwaitAdvance方法)。當然,向一個termination狀態的Phaser註冊party將不會有效;此時onAdvance()方法也將會返回true(預設實現),即所有的parties都會被deregister,即register個數為0。
4、Tiering(分層):
Phaser可以“分層”,以tree的方式構建Phaser來降低“競爭”。如果一個Phaser中有大量parties,這會導致嚴重的同步競爭,所以我們可以將它們分組並共享一個parent Phaser,這樣可以提高吞吐能力;Phaser中註冊和登出parties都會有Child 和parent Phaser自動管理。當Child Phaser中中註冊的parties變為非0時(在建構函式Phaser(Phaser parent,int parties),或者register()方法),Child Phaser將會註冊到其Parent上;當Child Phaser中的parties變為0時(比如由arrivedAndDegister()方法),那麼此時Child Phaser也將從其parent中登出出去。
5、監控:
同步的方法只會被register操作呼叫,對於當前state的監控方法可以在任何時候呼叫,比如getRegisteredParties()獲取已經註冊的parties個數,getPhase()獲取當前phase週期數等;因為這些方法並非同步,所以只能反映當時的瞬間狀態。
二、常用的Barrier比較
1、CountDownLatch
//初始化parties執行緒個數
int parties = 12;
CountDownLatch latch = new CountDownLatch(parties);
ExecutorService executor = Executors.newFixedThreadPool(parties);
for(int i = 0; i < parties; i++) {
executor.execute(new Runnable() {
public void run() {
try {
Thread.sleep(3000);
} catch (Exception e) {}
finally {
//當前任務執行完,倒計數器減1,直至所有任務都完成,倒計數器為0,喚醒主執行緒
latch.countDown();
}
}
});
}
latch.await();//主執行緒阻塞,直到所有的parties到達
executor.shutdown();
2、CyclicBarrier
//初始化parties執行緒個數
int parties = 12;
CyclicBarrier barrier = new CyclicBarrier(parties);
ExecutorService executor = Executors.newFixedThreadPool(parties);
for(int i = 0; i < parties; i++) {
executor.execute(new Runnable() {
public void run() {
try {
int i = 0;
while (i < 3 && !barrier.isBroken()) {
System.out.println("tid:"+Thread.currentThread().getId());
Thread.sleep(3000);
//如果所有的parties都到達,則開啟新的一次週期(generation),barrier可以被重用
barrier.await();
i++;
}
}
catch (Exception e) {}
finally {}
}
});
}
Thread.sleep(100000);
三、API
1、Phaser():建構函式,建立一個Phaser;預設parties個數為0。此後我們可以通過register()、bulkRegister()方法來註冊新的parties。每個Phaser例項內部,都持有幾個狀態資料:termination狀態、已經註冊的parties個數(registeredParties)、當前phase下已到達的parties個數(arrivedParties)、當前phase週期數,還有2個同步阻塞佇列Queue。Queue中儲存了所有的waiter,即因為advance而等待的執行緒資訊;這兩個Queue分別為evenQ和oddQ,這兩個Queue在實現上沒有任何區別,Queue的元素為QNode,每個QNode儲存一個waiter的資訊,比如Thread引用、阻塞的phase、超時的deadline、是否支援interrupted響應等。兩個Queue,其中一個儲存當前phase中正在使用的waiter,另一個備用,當phase為奇數時使用evenQ、oddQ備用,偶數時相反,即兩個Queue輪換使用。當advance事件觸發期間,新register的parties將會被放在備用的Queue中,advance只需要響應另一個Queue中的waiters即可,避免出現混亂。
2、Phaser(int parties):建構函式,初始一定數量的parties;相當於直接regsiter此數量的parties。
3、arrive():到達,阻塞,等到當前phase下其他parties到達。如果沒有register(即已register數量為0),呼叫此方法將會丟擲異常,此方法返回當前phase週期數,如果Phaser已經終止,則返回負數。
4、arriveAndDeregister():到達,並登出一個parties數量,非阻塞方法。登出,將會導致Phaser內部的parties個數減一(隻影響當前phase),即下一個phase需要等待arrive的parties數量將減一。異常機制和返回值,與arrive方法一致。
5、arriveAndAwaitAdvance():到達,且阻塞直到其他parties都到達,且advance。此方法等同於awaitAdvance(arrive())。如果你希望阻塞機制支援timeout、interrupted響應,可以使用類似的其他方法(參見下文)。如果你希望到達後且登出,而且阻塞等到當前phase下其他的parties到達,可以使用awaitAdvance(arriveAndDeregister())方法組合。此方法的異常機制和返回值同arrive()。
6、awaitAdvance(int phase):阻塞方法,等待phase週期數下其他所有的parties都到達。如果指定的phase與Phaser當前的phase不一致,則立即返回。
7、awaitAdvanceInterruptibly(int phase):阻塞方法,同awaitAdvance,只是支援interrupted響應,即waiter執行緒如果被外部中斷,則此方法立即返回,並丟擲InterrutedException。
8、awaitAdvanceInterruptibly(int phase,long timeout,TimeUnit unit):阻塞方法,同awaitAdvance,支援timeout型別的interrupted響應,即當前執行緒阻塞等待約定的時長,超時後以TimeoutException異常方式返回。
9、forceTermination():強制終止,此後Phaser物件將不可用,即register等將不再有效。此方法將會導致Queue中所有的waiter執行緒被喚醒。
10、register():新註冊一個party,導致Phaser內部registerPaties數量加1;如果此時onAdvance方法正在執行,此方法將會等待它執行完畢後才會返回。此方法返回當前的phase週期數,如果Phaser已經中斷,將會返回負數。
11、bulkRegister(int parties):批量註冊多個parties陣列,規則同10、。
12、getArrivedParties():獲取已經到達的parties個數。
13、getPhase():獲取當前phase週期數。如果Phaser已經中斷,則返回負值。
14、getRegisteredParties():獲取已經註冊的parties個數。
15、getUnarrivedParties():獲取尚未到達的parties個數。
16、onAdvance(int phase,int registeredParties):這個方法比較特殊,表示當進入下一個phase時可以進行的事件處理,如果返回true表示此Phaser應該終止(此後將會把Phaser的狀態為termination,即isTermination()將返回true。),否則可以繼續進行。phase引數表示當前週期數,registeredParties表示當前已經註冊的parties個數。
預設實現為:return registeredParties == 0;在很多情況下,開發者可以通過重寫此方法,來實現自定義的advance時間處理機制。
內部原理,比較簡單(簡述):
1)兩個計數器:分別表示parties個數和當前phase。register和deregister會觸發parties變更(CAS),全部parties到達(arrive)會觸發phase變更。
2)一個主要的阻塞佇列:非AQS實現,對於arriveAndWait的執行緒,會被新增到佇列中並被park阻塞,知道當前phase中最後一個party到達後觸發喚醒。