fork/join 分支/合併框架和自動機制拆分流Spliterator
阿新 • • 發佈:2018-12-12
利用fork/join求和程式碼分析
// 整合RecursiveTask用來建立可以用於分支/合併框架的任務 public class ForkJoinSumCalculator extends RecursiveTask<Long> { // 不再將任務分解為子任務的陣列大小 public static final long THRESHOLD = 10_000; // 需要求和的陣列 private final long[] numbers; // 子任務處理陣列的起始和終止位置 private final int start; private final int end; // 建構函式用於建立主任務 public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } // 用於遞迴方式為主任務建立子任務 private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } // 這裡舉一個用分支/合併 框架的實際例子,用這個框架為一個數字範圍(這裡用一個 long[]陣列表示)求和 @Override protected Long compute() { // 該任務用於求和的部分 的大小 int length = end - start; // 小於閾值順序計算結果 if (length <= THRESHOLD) { return computeSequentially(); } // 建立子任務為陣列的前一半求和 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2); // 利用forkjoinpool執行緒非同步執行新建立的子任務 leftTask.fork(); // 建立一個子任務用於為陣列後半部分求和 ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end); // 同步執行第二個子任務,有可能進一步進行遞迴劃分 Long rightResult = rightTask.compute(); // 讀取第一個子任務的執行結果,如果沒有執行完成就等待 Long leftResult = leftTask.join(); // 該任務的結果是兩個子任務的結果的組合 return leftResult + rightResult; } private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } // 現在編寫一個方法來並行對前n個自然數求和就很簡單了。你只需把想要的數字陣列傳給 ForkJoinSumCalculator的建構函式 public static long forkJoinSum(long n) { // 這裡用了一個LongStream來生成包含前n個自然數的陣列 long[] numbers = LongStream.rangeClosed(1, n).toArray(); // 然後建立一個ForkJoinTask (RecursiveTask的父類),並把陣列傳遞給ForkJoinSumCalculator的公共建構函式 ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); // 建立了一個新的ForkJoinPool,並把任務傳給它的呼叫方法 。在 ForkJoinPool中執行時,最後一個方法返回的值就是ForkJoinSumCalculator類定義的任務結果 return FORK_JOIN_POOL.invoke(task); } }
有幾點需要注意:
1.一個任務可以分解成多個獨立的子任務,才能讓效能在並行化時 有所提升
2.不應該在RecursiveTask內部使用ForkJoinPool的invoke方法。相反,你應該始終直 接呼叫compute或fork方法,只有順序程式碼才應該用invoke來啟動平行計算。
3.每個子任務都必須等待另一個子任務完成才能啟動
自動機制拆分流Spliterator
public interface Spliterator<T> { // T是Spliterator遍歷的元素的型別 // tryAdvance方法的行為類似於普通的 Iterator boolean tryAdvance(Consumer<? super T> action); // trySplit是專為Spliterator介面設計的,因為它可以把一些元素劃出去分 給第二個Spliterator(由該方法返回),讓它們兩個並行處理。 Spliterator<T> trySplit(); // estimateSize方法估計還剩下多少元素要遍歷,因為即使不那麼確切,能快速算出來是一個值 也有助於讓拆分均勻一點 long estimateSize(); // characteristics方法,它將返回一個int,代表Spliterator本身特性集的編碼。使用Spliterator的客戶可以用這些特性來更好地控制和 優化它的使用。 int characteristics(); }
遞迴拆分過程
Spliterator的特性 :
ORDERED 元素有既定的順序(例如List),因此Spliterator在遍歷和劃分時也會遵循這一順序
DISTINCT 對於任意一對遍歷過的元素x和y,x.equals(y)返回false
SORTED 遍歷的元素按照一個預定義的順序排序
SIZED 該Spliterator由一個已知大小的源建立(例如Set),因此estimatedSize()返回的是準確值
NONNULL 保證遍歷的元素不會為null IMMUTABL E Spliterator的資料來源不能修改。這意味著在遍歷時不能新增、刪除或修改任何元素
CONCURRE NT 該Spliterator的資料來源可以被其他執行緒同時修改而無需同步
SUBSIZED 該Spliterator和所有從它拆分出來的Spliterator都是SIZED