1. 程式人生 > >6.2阻礙迴圈(Cyclic Barriers)

6.2阻礙迴圈(Cyclic Barriers)

 一個cyclicbarrier讓一堆的執行緒處於等待狀態,它們彼此之間會有相同的阻礙點。這個阻礙是迴圈的,因為當等待的執行緒等到釋放且,它可能會被重複利用。這個同步器在應用的程式中是非常有用的,它會執行一個固定大小的執行緒,偶爾必須等待每一個執行緒。

     java.util.concurrent.CyclicBarrier類繼續一個迴圈的阻礙同步器。你可以通過執行這個類的CyclicBarrier(int parties)的構造器去明確例項化一個CyclicBarrier的例項。當parties的值小於1時,這個構造器將會丟擲IllegalArgumentException。

     實際上你也執行CyclicBarrier(int parties, Runnabl barrierAction)的構造器去初始化一個迴圈的阻礙部分;其中的當阻礙被執行時,barrierAction將會執行。換句話說,parties -1個執行緒在等待,當多個執行緒到達時,到達的執行緒將會執行barrierAction和所有執行緒都會向前執行。這個runnable在所有執行緒都在執行時,而去更新分享的狀態是非常的用的。當parties的值小於1時,就會丟擲IllegalArgument的錯誤。(在前面的構造器執行這個構造器時,如果放入的barrierAction為null,那麼當阻礙釋放後,將沒的執行緒會被執行。)

     CyclicBarrier也提供瞭如下的方法:

1)        int await():強迫正在請求的執行緒處於等待狀態,直到所有部分在cyclic barrier中都執行await()的方法。當它或其它等待執行緒被打斷,或其它執行緒在等待時超時,或在這個cyclic barrier中執行reset()的方法,那麼在請求的執行緒會停止等待。如果正在請求的執行緒自己在進行執行時被打斷,或在等待中被打斷,這個方法將會丟擲InterruptedException,並且請求的執行緒打斷狀態會被清空。任何執行緒在等待時被執行reset()的方法,或執行await()時被打斷,或任何等待執行緒被打斷,那麼將會丟擲BrokenBarrierException.當在等待時被打斷,或者等待的執行緒丟擲BrokenBarrierException,那麼障礙(barrier)都會處於打斷狀態。如果請求的執行緒是最後一個到達,和構造器的barrierAction不為null,在允許其它執行緒繼續執行前,將會讓請求的執行緒去執行。這個方法將會返回請求執行緒的索引值,這裡的索引getParties()-1說明從第一個執行緒到達到最後一個執行緒的達到。

2)        int await(long timeout,TimeUnit unit):這個方法與前面的方法是相似的,唯一不一樣的是,它讓你明確要多長時間讓請求的執行緒進入到等待狀態。線上程等待期間而超時了,那麼將會丟擲java.util.concurrent.TimeoutException.

3)        int getNumberWaiting():返回當前阻礙下等待的執行緒數量。這個方法對於打debug和輸出資訊很有用。

4)        int getParties():返回需要呼叫障礙(barrier)的數量。

5)        boolean isBroken():當一個或更多個部分被這個barrier打斷,因為打斷、超時、重置,或因為異常的丟擲錯誤,這個方法將會返回true,其它情況返回false.

6)        void reset():重置這個障礙(barrier)去初始化它的狀態。如果任何在障礙(barrier)中等待,那麼將會丟擲BrokenBarrierException.

 Cyclicbarriers在平形分隔的情況是很有用的。當一個長任務被分解為多個子任務,然後將子任務執行的結果再合併成一個結果返回。如下的Listing6-2的例子。

Listing6-2

package com.owen.thread.chapter6;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo
{

	public static void main(String[] args)
	{
		float[][] matrix = new float[3][3];
		int counter = 0;
		for (int row = 0; row < matrix.length; row++)
			for (int col = 0; col < matrix[0].length; col++)
				matrix[row][col] = counter++;
		dump(matrix);
		System.out.println();
		Solver solver = new Solver(matrix);
		System.out.println();
		dump(matrix);
	}

	static void dump(float[][] matrix)
	{
		for (int row = 0; row < matrix.length; row++)
		{
			for (int col = 0; col < matrix[0].length; col++)
				System.out.print(matrix[row][col] + " ");
			System.out.println();
		}
	}
}

class Solver
{
	final int N;
	final float[][] data;
	final CyclicBarrier barrier;

	class Worker implements Runnable
	{
		int myRow;
		boolean done = false;

		Worker(int row)
		{
			myRow = row;
		}

		boolean done()
		{
			return done;
		}

		void processRow(int myRow)
		{
			System.out.println("Processing row: " + myRow);
			for (int i = 0; i < N; i++)
				data[myRow][i] *= 10;
			done = true;
		}

		@Override
		public void run()
		{
			while (!done())
			{
				processRow(myRow);
				try
				{
					barrier.await();
				} catch (InterruptedException ie)
				{
					return;
				} catch (BrokenBarrierException bbe)
				{
					return;
				}
			}
		}
	}

	public Solver(float[][] matrix)
	{
		data = matrix;
		N = matrix.length;
		barrier = new CyclicBarrier(N, new Runnable()
		{
			@Override
			public void run()
			{
				mergeRows();
			}
		});

		for (int i = 0; i < N; ++i)
			new Thread(new Worker(i)).start();
		waitUntilDone();
	}

	void mergeRows()
	{
		System.out.println("merging");
		synchronized ("abc")
		{
			"abc".notify();
		}
	}

	void waitUntilDone()
	{
		synchronized ("abc")
		{
			try
			{
				System.out.println("main thread waiting");
				"abc".wait();
				System.out.println("main thread notified");
			} catch (InterruptedException ie)
			{
				System.out.println("main thread interrupted");
			}
		}
	}
}

首先主執行緒建立一個浮點型的陣列,然後通過dump的方法,將建立的陣列值輸出。然後執行緒會去例項化Solver的物件,這裡會分隔執行緒去計算每一行的值。也就是說修改原告陣列內的值,只是按行分隔修改。

     Solver出現一個構造器,這個構造器中會去接收陣列的值的,並且儲存它的引用,當然還就是定義一個引數N去儲存陣列的大小。構造器建立一個有N部分和一個障礙(barrier)的執行動作的cyclic barrier。在障礙(barrier)的執行中,是將陣列計算的所有行的結果合併放入到陣列中的。最後,構造器建立一個工作的執行緒,這個工作執行緒會被切分多個工作執行,就是去修改每一行的矩陣值。構造器會處於等待狀,直到所有的工作都完成。

     Worker會重複地去執行run()方法,執行run()方法會去執行processRow()的方法,這個方法會明確去修改矩陣行的值,直到done()返回true的值,方法才會停止執行。當明確processRow()的方法執行完了,那麼就會執行await()的方法,執行緒就不能繼續前進。

     最為關鍵的是,所有工作執行緒都會執行await()的方法。當最後一個工作執行緒執行好了矩陣的修改後,就會執行await()的方法,然後觸發障礙(barrier)的動作,使用剛剛執行的所有工作執行緒合併成一個矩陣的結果。在這個例子是,合併不是必須的,但是在更為複雜的例子中,合併是必須的。

     最後任務執行mergeRows()的方法,去喚醒主執行緒執行Solver的構造器。這個執行緒連線著一個字串“abc”物件,使執行緒處於等待狀態。呼叫notify()的方法,僅僅是喚醒在這個監聽器中的等待執行緒。

     執行上面的程式碼,你可能會得到相似的結果:

0.0 1.0 2.0 
3.0 4.0 5.0 
6.0 7.0 8.0 

Processing row: 0
main thread waiting
Processing row: 1
Processing row: 2
merging
main thread notified

0.0 10.0 20.0 
30.0 40.0 50.0 
60.0 70.0 80.0