1. 程式人生 > >Java併發——Phaser “階段器”

Java併發——Phaser “階段器”

1. Phaser

Phaser是一個更加彈性的同步屏障。類java.util.concurrent實現了Phaser.

這段文字轉自:https://blog.csdn.net/u010739551/article/details/51083004 

Phaser表示“階段器”,用來解決控制多個執行緒分階段共同完成任務的情景問題。其作用相比CountDownLatch和CyclicBarrier更加靈活,例如有這樣的一個題目:5個學生一起參加考試,一共有三道題,要求所有學生到齊才能開始考試,全部同學都做完第一題,學生才能繼續做第二題,全部學生做完了第二題,才能做第三題,所有學生都做完的第三題,考試才結束。分析這個題目:這是一個多執行緒(5個學生)分階段問題(考試考試、第一題做完、第二題做完、第三題做完),所以很適合用Phaser解決這個問題。

2. 常用方法

(1)Phaser(int parties)建構函式

與CountDownLatch一樣,傳入同步的執行緒數,也支援層次構造Phaser(Phaser parent)。

(2)register()

動態新增一個或多個參與者,同時返回phase值作抵達分類用。

(3)bulkRegister(int parties)

將指定數目的參與者註冊到phaser中,所有這些新的參與者都將被當成沒有執行完本階段的執行緒。

(4)int arriveAndAwaitAdvance()

類似await()方法,記錄到達執行緒數,阻塞等待其他執行緒到達同步點後再繼續執行。

(5)arriveAndDeregister()

動態撤銷執行緒在phaser的註冊,通知phaser物件,該執行緒已經結束該階段且不參與後面階段。由此減少了未來phase上需要前進的執行緒數量。

(6)arrive()

通知phaser該執行緒已經完成該階段,但不等待其他執行緒。必須小心使用這個方法,因為它不會與其他執行緒同步。

(7)forceTermination()

強制phaser進入終止狀態,不管是否存在未註冊的參與執行緒,當一個執行緒出現錯誤時,強制終止phaser是很有意義的。

(8)boolean onAdvance(int phase, int registeredParties)

方法。此方法有2個作用:
        當每一個階段執行完畢,此方法會被自動呼叫,因此,過載此方法寫入的程式碼會在每個階段執行完畢時執行,相當於CyclicBarrier的barrierAction。
        當此方法返回true時,意味著Phaser被終止,因此可以巧妙的設定此方法的返回值來終止所有執行緒。

(9)在Phaser內有2個重要狀態,分別是phase和party。
        phase就是階段,初值為0,當所有的執行緒執行完本輪任務,同時開始下一輪任務時,意味著當前階段已結束,進入到下一階段,phase的值自動加1。

         party就是執行緒, party=4就意味著Phaser物件當前管理著4個執行緒。
 

phaser有一個重大特性,就是不必對它的方法進行異常處理。置於休眠的執行緒不會響應中斷事件,不會丟擲interruptedException異常, 只有一個方法會響應:AwaitAdvanceInterruptibly(int phaser).

3.示例

5個學生一起參加考試,一共有三道題,要求所有學生到齊才能開始考試,全部同學都做完第一題,學生才能繼續做第二題,全部學生做完了第二題,才能做第三題,所有學生都做完的第三題,考試才結束。分析這個題目:這是一個多執行緒(5個學生)分階段問題(考試考試、第一題做完、第二題做完、第三題做完),所以很適合用Phaser解決這個問題。
 

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class PhaserDemo {

	public static void main(String[] args)
	{
		MyPhaser phaser = new MyPhaser();
		StudentTask[] tasks = new StudentTask[5];
		for (int i = 0; i < tasks.length; i++) {
			tasks[i] = new StudentTask(phaser);
			phaser.register();	//註冊一次表示phaser維護的執行緒個數
		}
		Thread[] threads = new Thread[tasks.length];
		for (int i = 0; i < tasks.length; i++) {
			threads[i] = new Thread(tasks[i], "Student "+i);
			threads[i].start();
		}

		//等待所有執行緒執行結束
		for (int i = 0; i < tasks.length; i++) {
			try {
				threads[i].join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		System.out.println("Phaser has finished:"+phaser.isTerminated());		
		
	}
}

class StudentTask implements Runnable{
	private Phaser phaser;
	
	public StudentTask(Phaser phaser) {
		this.phaser = phaser;
	}
	
	Random random = new Random();
	@Override
	public void run()
	{
		System.out.println(Thread.currentThread().getName()+"已做好準備");
		phaser.arriveAndAwaitAdvance();
		
		System.out.println(Thread.currentThread().getName()+"開始做第1題");
		int time1 = doExercise1();
		System.out.println(Thread.currentThread().getName()+"完成第1題,做題時間:" + time1);
		phaser.arriveAndAwaitAdvance();
		
		System.out.println(Thread.currentThread().getName()+"開始做第2題");
		int time2 = doExercise2();
		System.out.println(Thread.currentThread().getName()+"完成第2題,做題時間:" + time2);
		phaser.arriveAndAwaitAdvance();
		
		System.out.println(Thread.currentThread().getName()+"開始做第3題");
		int time3 = doExercise3();
		System.out.println(Thread.currentThread().getName()+"完成第3題,做題時間:" + time3);
		phaser.arriveAndAwaitAdvance();	
		
	}

	private int doExercise1() {
		int time1 = random.nextInt(100);
		try {
			TimeUnit.SECONDS.sleep(time1);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return time1;
	}
	
	private int doExercise2() {
		int time2 = random.nextInt(100);
		try {
			TimeUnit.SECONDS.sleep(time2);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return time2;
	}
	
	private int doExercise3() {
		int time3 = random.nextInt(100);
		try {
			TimeUnit.SECONDS.sleep(time3);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return time3;
	}
	
}

class MyPhaser extends Phaser{
	 @Override
	 protected boolean onAdvance(int phase, int registeredParties)
	 {
		 switch(phase) {
		     case 0:
		    	 return studentArrived();
		     case 1:
		    	 return finishFirstExercise();
		     case 2:
		    	 return finishSecondExercise();
		     case 3:
		    	 return finishExam();
		     default:
		    	 return true;
		 }	 		 
	 }
	 
	 private boolean studentArrived() {
		 System.out.println("全部考生都已做好準備,考生人數:" + getRegisteredParties());
		 return false;
	}
	 
	 private boolean finishFirstExercise() {
			System.out.println("所有考生都已完成第1題");
			return false;
		}


	 private boolean finishSecondExercise() {
			System.out.println("所有考生都已完成第2題");
			return false;
		}

	 private boolean finishExam() {
			System.out.println("所有考生都已完成第3題,結束考試!");
			return true;
		}
}

 執行結果:

Student 1已做好準備
Student 4已做好準備
Student 2已做好準備
Student 3已做好準備
Student 0已做好準備
全部考生都已做好準備,考生人數:5
Student 0開始做第1題
Student 4開始做第1題
Student 3開始做第1題
Student 1開始做第1題
Student 2開始做第1題
Student 2完成第1題,做題時間:32
Student 0完成第1題,做題時間:59
Student 3完成第1題,做題時間:60
Student 1完成第1題,做題時間:69
Student 4完成第1題,做題時間:96
所有考生都已完成第1題
Student 0開始做第2題
Student 1開始做第2題
Student 4開始做第2題
Student 2開始做第2題
Student 3開始做第2題
Student 2完成第2題,做題時間:8
Student 0完成第2題,做題時間:38
Student 3完成第2題,做題時間:69
Student 1完成第2題,做題時間:73
Student 4完成第2題,做題時間:87
所有考生都已完成第2題
Student 1開始做第3題
Student 3開始做第3題
Student 2開始做第3題
Student 0開始做第3題
Student 4開始做第3題
Student 2完成第3題,做題時間:23
Student 1完成第3題,做題時間:25
Student 0完成第3題,做題時間:25
Student 4完成第3題,做題時間:37
Student 3完成第3題,做題時間:57
所有考生都已完成第3題,結束考試!
Phaser has finished:true