1. 程式人生 > >Java併發工具類Phaser

Java併發工具類Phaser

Phaser由java7中推出,是Java SE 7中新增的一個使用同步工具,在功能上面它與CyclicBarrierCountDownLatch有些重疊,但是它提供了更加靈活、強大的用法。

CyclicBarrier,允許一組執行緒互相等待,直到到達某個公共屏障點。它提供的await()可以實現讓所有參與者在臨界點到來之前一直處於等待狀態。

CountDownLatch,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。它提供了await()、countDown()兩個方法來進行操作。

在Phaser中,它把多個執行緒協作執行的任務劃分為多個階段,程式設計時需要明確各個階段的任務,每個階段都可以有任意個參與者,執行緒都可以隨時註冊並參與到某個階段。

構造

Phaser建立後,初始階段編號為0,建構函式中指定初始參與個數。

註冊:Registration

Phaser支援通過register()和bulkRegister(int parties)方法來動態調整註冊任務的數量。

Arrival

每個Phaser例項都會維護一個phase number,初始值為0。每當所有註冊的任務都到達Phaser時,phase number累加,並在超過Integer.MAX_VALUE後清零。arrive()和arriveAndDeregister()方法用於記錄到達;其中arrive(),某個參與者完成任務後呼叫;arriveAndDeregister(),任務完成,取消自己的註冊。arriveAndAwaitAdvance(),自己完成等待其他參與者完成,進入阻塞,直到Phaser成功進入下個階段。

example 1

public class PhaserTest_1 {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(5);
        
        for(int i = 0 ; i < 5 ; i++){
            Task_01 task_01 = new Task_01(phaser);
            Thread thread = new Thread(task_01, "PhaseTest_" + i);
            thread.start();
        }
    }
    
    static class Task_01 implements Runnable{
        private final Phaser phaser;
        
        public Task_01(Phaser phaser){
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "執行任務完成,等待其他任務執行......");
            //等待其他任務執行完成
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + "繼續執行任務...");
        }
    }
}

 

執行結果:

PhaseTest_0執行任務完成,等待其他任務執行......
PhaseTest_1執行任務完成,等待其他任務執行......
PhaseTest_3執行任務完成,等待其他任務執行......
PhaseTest_2執行任務完成,等待其他任務執行......
PhaseTest_4執行任務完成,等待其他任務執行......
PhaseTest_4繼續執行任務...
PhaseTest_1繼續執行任務...
PhaseTest_0繼續執行任務...
PhaseTest_2繼續執行任務...
PhaseTest_3繼續執行任務...

在該例項中我們可以確認,所有子執行緒的****+”繼續執行任務…”,都是線上程呼叫arriveAndAwaitAdvance()方法之後執行的。

example 2

前面提到過,Phaser提供了比CountDownLatch、CyclicBarrier更加強大、靈活的功能,從某種程度上來說,Phaser可以替換他們:

public class PhaserTest_5 {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1);        //相當於CountDownLatch(1) 
        
        //五個子任務
        for(int i = 0 ; i < 3 ; i++){
            Task_05 task = new Task_05(phaser);
            Thread thread = new Thread(task,"PhaseTest_" + i);
            thread.start();
        }
        
        try {
            //等待3秒
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        phaser.arrive();        //countDownLatch.countDown()
    }
    
    static class Task_05 implements Runnable{
        private final Phaser phaser;
        
        Task_05(Phaser phaser){
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            phaser.awaitAdvance(phaser.getPhase());        //countDownLatch.await()
            System.out.println(Thread.currentThread().getName() + "執行任務...");
        }
    }
}

 

在這裡,任務一開始並沒有真正執行,而是等待三秒後執行。

對於CyclicBarrier就更加簡單了,直接arriveAndAwaitAdvance()方法替換,如example 1。

example 3

在CyclicBarrier中當任務執行完之後可以執行一個action,在Phaser中同樣有一個對應的action,只不過Phaser需要重寫onAdvance()方法:

public class PhaserTest_3 {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3){
            /**
             * registeredParties:執行緒註冊的數量
             * phase:進入該方法的執行緒數,
             */
             protected boolean onAdvance(int phase, int registeredParties) { 
                 System.out.println("執行onAdvance方法.....;phase:" + phase + "registeredParties=" + registeredParties);
                 return phase == 3; 
             }
        };
        
        for(int i = 0 ; i < 3 ; i++){
            Task_03 task = new Task_03(phaser);
            Thread thread = new Thread(task,"task_" + i);
            thread.start();
        }
        while(!phaser.isTerminated()){
            phaser.arriveAndAwaitAdvance();    //主執行緒一直等待
        }
        System.out.println("主執行緒任務已經結束....");
    }
    
    static class Task_03 implements Runnable{
        private final Phaser phaser;
        
        public Task_03(Phaser phaser){
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            do{
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "開始執行任務...");
                phaser.arriveAndAwaitAdvance();
            }while(!phaser.isTerminated());
        }
    }
}

執行結果:

task_0開始執行任務...
task_1開始執行任務...
task_1執行onAdvance方法.....;phase:0registeredParties=3
task_2開始執行任務...
task_0開始執行任務...
task_1開始執行任務...
task_0執行onAdvance方法.....;phase:1registeredParties=3
task_2開始執行任務...
task_2執行onAdvance方法.....;phase:2registeredParties=3
主執行緒任務已經結束....
task_0開始執行任務...