6.2阻礙迴圈(Cyclic Barriers)
一個cyclicbarrier讓一堆的執行緒處於等待狀態,它們彼此之間會有相同的阻礙點。這個阻礙是迴圈的,因為當等待的執行緒等到釋放且,它可能會被重複利用。這個同步器在應用的程式中是非常有用的,它會執行一個固定大小的執行緒,偶爾必須等待每一個執行緒。
java.util.concurrent.CyclicBarrier類繼續一個迴圈的阻礙同步器。你可以通過執行這個類的CyclicBarrier(int parties)的構造器去明確例項化一個CyclicBarrier的例項。當parties的值小於1時,這個構造器將會丟擲IllegalArgumentException。
實際上你也執行CyclicBarrier(int parties, Runnabl barrierAction)的構造器去初始化一個迴圈的阻礙部分;其中的當阻礙被執行時,barrierAction將會執行。換句話說,parties -1個執行緒在等待,當多個執行緒到達時,到達的執行緒將會執行barrierAction和所有執行緒都會向前執行。這個runnable在所有執行緒都在執行時,而去更新分享的狀態是非常的用的。當parties的值小於1時,就會丟擲IllegalArgument的錯誤。(在前面的構造器執行這個構造器時,如果放入的barrierAction為null,那麼當阻礙釋放後,將沒的執行緒會被執行。)
CyclicBarrier也提供瞭如下的方法:
1) int await():強迫正在請求的執行緒處於等待狀態,直到所有部分在cyclic barrier中都執行await()的方法。當它或其它等待執行緒被打斷,或其它執行緒在等待時超時,或在這個cyclic barrier中執行reset()的方法,那麼在請求的執行緒會停止等待。如果正在請求的執行緒自己在進行執行時被打斷,或在等待中被打斷,這個方法將會丟擲InterruptedException,並且請求的執行緒打斷狀態會被清空。任何執行緒在等待時被執行reset()的方法,或執行await()時被打斷,或任何等待執行緒被打斷,那麼將會丟擲BrokenBarrierException.當在等待時被打斷,或者等待的執行緒丟擲BrokenBarrierException,那麼障礙(barrier)都會處於打斷狀態。如果請求的執行緒是最後一個到達,和構造器的barrierAction不為null,在允許其它執行緒繼續執行前,將會讓請求的執行緒去執行。這個方法將會返回請求執行緒的索引值,這裡的索引getParties()-1說明從第一個執行緒到達到最後一個執行緒的達到。
2) int await(long timeout,TimeUnit unit):這個方法與前面的方法是相似的,唯一不一樣的是,它讓你明確要多長時間讓請求的執行緒進入到等待狀態。線上程等待期間而超時了,那麼將會丟擲java.util.concurrent.TimeoutException.
3) int getNumberWaiting():返回當前阻礙下等待的執行緒數量。這個方法對於打debug和輸出資訊很有用。
4) int getParties():返回需要呼叫障礙(barrier)的數量。
5) boolean isBroken():當一個或更多個部分被這個barrier打斷,因為打斷、超時、重置,或因為異常的丟擲錯誤,這個方法將會返回true,其它情況返回false.
6) void reset():重置這個障礙(barrier)去初始化它的狀態。如果任何在障礙(barrier)中等待,那麼將會丟擲BrokenBarrierException.
Cyclicbarriers在平形分隔的情況是很有用的。當一個長任務被分解為多個子任務,然後將子任務執行的結果再合併成一個結果返回。如下的Listing6-2的例子。
Listing6-2
package com.owen.thread.chapter6;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo
{
public static void main(String[] args)
{
float[][] matrix = new float[3][3];
int counter = 0;
for (int row = 0; row < matrix.length; row++)
for (int col = 0; col < matrix[0].length; col++)
matrix[row][col] = counter++;
dump(matrix);
System.out.println();
Solver solver = new Solver(matrix);
System.out.println();
dump(matrix);
}
static void dump(float[][] matrix)
{
for (int row = 0; row < matrix.length; row++)
{
for (int col = 0; col < matrix[0].length; col++)
System.out.print(matrix[row][col] + " ");
System.out.println();
}
}
}
class Solver
{
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable
{
int myRow;
boolean done = false;
Worker(int row)
{
myRow = row;
}
boolean done()
{
return done;
}
void processRow(int myRow)
{
System.out.println("Processing row: " + myRow);
for (int i = 0; i < N; i++)
data[myRow][i] *= 10;
done = true;
}
@Override
public void run()
{
while (!done())
{
processRow(myRow);
try
{
barrier.await();
} catch (InterruptedException ie)
{
return;
} catch (BrokenBarrierException bbe)
{
return;
}
}
}
}
public Solver(float[][] matrix)
{
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N, new Runnable()
{
@Override
public void run()
{
mergeRows();
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
void mergeRows()
{
System.out.println("merging");
synchronized ("abc")
{
"abc".notify();
}
}
void waitUntilDone()
{
synchronized ("abc")
{
try
{
System.out.println("main thread waiting");
"abc".wait();
System.out.println("main thread notified");
} catch (InterruptedException ie)
{
System.out.println("main thread interrupted");
}
}
}
}
首先主執行緒建立一個浮點型的陣列,然後通過dump的方法,將建立的陣列值輸出。然後執行緒會去例項化Solver的物件,這裡會分隔執行緒去計算每一行的值。也就是說修改原告陣列內的值,只是按行分隔修改。
Solver出現一個構造器,這個構造器中會去接收陣列的值的,並且儲存它的引用,當然還就是定義一個引數N去儲存陣列的大小。構造器建立一個有N部分和一個障礙(barrier)的執行動作的cyclic barrier。在障礙(barrier)的執行中,是將陣列計算的所有行的結果合併放入到陣列中的。最後,構造器建立一個工作的執行緒,這個工作執行緒會被切分多個工作執行,就是去修改每一行的矩陣值。構造器會處於等待狀,直到所有的工作都完成。
Worker會重複地去執行run()方法,執行run()方法會去執行processRow()的方法,這個方法會明確去修改矩陣行的值,直到done()返回true的值,方法才會停止執行。當明確processRow()的方法執行完了,那麼就會執行await()的方法,執行緒就不能繼續前進。
最為關鍵的是,所有工作執行緒都會執行await()的方法。當最後一個工作執行緒執行好了矩陣的修改後,就會執行await()的方法,然後觸發障礙(barrier)的動作,使用剛剛執行的所有工作執行緒合併成一個矩陣的結果。在這個例子是,合併不是必須的,但是在更為複雜的例子中,合併是必須的。
最後任務執行mergeRows()的方法,去喚醒主執行緒執行Solver的構造器。這個執行緒連線著一個字串“abc”物件,使執行緒處於等待狀態。呼叫notify()的方法,僅僅是喚醒在這個監聽器中的等待執行緒。
執行上面的程式碼,你可能會得到相似的結果:
0.0 1.0 2.0
3.0 4.0 5.0
6.0 7.0 8.0
Processing row: 0
main thread waiting
Processing row: 1
Processing row: 2
merging
main thread notified
0.0 10.0 20.0
30.0 40.0 50.0
60.0 70.0 80.0