Java8 Stream 平行計算實現的原理
這兩天組內的小夥伴在學習Java8,推廣在新專案內使用新特性。正好看到了Stream 帶來的遍歷的多執行緒併發:
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9, 8, 0, 1)
.stream()
.parallel()
.collect(Collectors.groupingBy(x -> x % 10))
.forEach((x, y) -> System.out.println(x + ":" + y));
和小夥伴一起試著用各種玩法玩了一下Java8的函數語言程式設計特性之後,感嘆到這樣子併發計算真的是越來越簡單了的。
但是深入思考之後就會很自然的想到一個問題,這個過程中,我們並沒有顯示的告訴Stream,我們需要多少個執行緒進行平行計算?我們能否複用之前的執行緒進行計算?
帶著這個問題我們先打開了VisualVM,檢視一下我們執行這樣一個任務會啟動多少個執行緒?
我們可以看到預設的parallel計算啟動了三個執行緒進行並行。這三個執行緒是怎麼來的呢?抱著這個問題,我們來參考一下Jdk8的原始碼,來看看它是如何設定這個值的。
我們知道Stream 是一個惰性求值的系統(如何進行惰性求值,我會在另一篇部落格中進行分析),那麼我們只需要找它最後求值的過程,看它是怎樣進行求值的就可以了。在AbstractPipeline 這個類裡面我們找到了Stream 計算的最終求值過程的預設實現:
/** * Evaluate the pipeline with a terminal operation to produce a result. * * @param <R> the type of result * @param terminalOp the terminal operation to be applied to the pipeline. * @return the result */ final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
在15行,我們可以看到,在求值的時候會檢查平行計算的標誌位,如果標誌了平行計算的話,我們就會並行求值,反之則會序列求值。我們可以進一步進入並行求值的邏輯中,這是一個TerminalOp的預設介面方法,預設實現就是直接呼叫序列求值,在FindOp、ForEachOp、MatchOp 和 ReduceOp 中得到了覆蓋。
如FindOp的程式碼示例,這四個操作都是建立一個Task的示例,然後執行invoke方法。這些Task的繼承關係如圖:@Override public <P_IN> O evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return new FindTask<>(this, helper, spliterator).invoke(); }
可以看出所有的Task 都繼承自Jdk7 中引入的ForkJoin 並行框架的ForkJoinTask。所以我們可以看出Stream 的並行是依賴於ForkJoin 框架的。以AbstractTask 為例我們看看它是如何進行平行計算的:
@Override
public void compute() {
Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
long sizeEstimate = rs.estimateSize();
long sizeThreshold = getTargetSize(sizeEstimate);
boolean forkRight = false;
@SuppressWarnings("unchecked") K task = (K) this;
while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
K leftChild, rightChild, taskToFork;
task.leftChild = leftChild = task.makeChild(ls);
task.rightChild = rightChild = task.makeChild(rs);
task.setPendingCount(1);
if (forkRight) {
forkRight = false;
rs = ls;
task = leftChild;
taskToFork = rightChild;
}
else {
forkRight = true;
task = rightChild;
taskToFork = leftChild;
}
taskToFork.fork();
sizeEstimate = rs.estimateSize();
}
task.setLocalResult(task.doLeaf());
task.tryComplete();
}
ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
Spliterator spliterator) {
super(parent, spliterator);
this.op = parent.op;
}
@Override
protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator spliterator) {
return new ReduceTask<>(this, spliterator);
}
這裡面的主要邏輯就是
先呼叫當前splititerator 方法的estimateSize 方法,預估這個分片中的資料量
根據預估的資料量獲取最小處理單元的大小閾值,即當資料量已經小於這個閾值的時候進行計算,否則進行fork 將任務劃分成更小的資料塊,進行求解。這裡值得注意的是,getTargetSize 在第一次呼叫的時候會設定:
預測資料量大小 / (預設併發度 * 4) 的結果作為最小執行單元的數量(配置的預設值是cpu 數 – 1,可以通過java.util.concurrent.ForkJoinPool.common.parallelism設定)
如果當前分片大小仍然大於處理資料單元的閾值,且分片繼續嘗試切分成功,那麼就繼續切分,分別將左右分片的任務建立為新的Task,並且將當前的任務關聯為兩個新任務的父級任務(邏輯在makeChild 裡面)
先後對左右子節點的任務進行fork,對另外的分割槽進行分解。同時設定pending 為1,這代表一個task 實際上只會有一個等待的子節點(被fork)。
當任務已經分解到足夠小的時候退出迴圈,嘗試進行結束。呼叫子類實現的doLeaf方法,完成最小計算單元的計算任務,並設定到當前任務的localResult中
呼叫tryComplete 方法進行最終任務的掃尾工作,如果該任務pending 值不等於0,則原子的減1,如果已經等於0,說明任務都已經完成,則呼叫onCompletion 回撥,如果該任務是葉子任務,則直接銷燬中間資料結束;如果是中間節點會將左右子節點的結果進行合併
檢查如果這個任務已經沒有父級任務了,則將該任務置為正常結束,如果還有則嘗試遞迴的去呼叫父級節點的onCompletion回撥,逐級進行任務的合併。
public final void tryComplete() {
CountedCompleter<?> a = this, s = a;
for (int c;;) {
if ((c = a.pending) == 0) {
a.onCompletion(s);
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
return;
}
}
說了這麼多,大家也基本理解了Stream 的實現原理了。其實本質上就是在ForkJoin上進行了一層封裝,將Stream 不斷嘗試分解成更小的split,然後使用fork/join 框架分而治之。
所以我們以往關於Fork/Join 的經驗也都可以派上用場,可以解答之前我們的幾個疑問:
我在visualvm 中看到的 parallize 的3個執行緒是怎麼來的?
答:由於 taskToFork.fork() 呼叫,parallize使用了預設的ForkJoinPool.common 預設的一個靜態執行緒池,這個執行緒池的預設執行緒個數是cpu 數量-1。由於我的程式碼是執行在四個邏輯核心的MacBook 上,所以這裡的執行緒個數為3。如下面程式碼和註釋所示:
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
如何控制parallize 的執行緒數?答:我們可以自己構建一個ForkJoinPool,向其中提交一個parallize 任務,可以做到控制併發度。如以下示例程式碼:我們將之前的stream 的過程構造成一個runnable 的lambda 匿名函式 ()-> {…}。提交至執行緒池中,就可以按照我們想要的併發度進行計算了。
ForkJoinPool pool = new ForkJoinPool(2);
ret = pool.submit(() -> {
return LongStream.range(1, 50 * 1024 * 1024).boxed().collect(Collectors.toList())
.stream()
.parallel()
.map(x -> x * 2)
.filter(x -> x < 1500)
.reduce((x,y) -> x+y)
.get();
}).get();
接下來打算繼續深入這兩個很有意思的問題:深入介紹一下Stream的惰性求值過程,最好能跟Scala 的Stream 實現進行比較:Java8 Stream 惰性求值實現分析 – 驢和羊
深入介紹ForkJoin 的底層實現,包括它是如何進行執行緒排程和cache line sharing 優化的
參考文獻:
相關推薦
Java8 Stream 平行計算實現的原理
轉自:http://lvheyang.com/?p=87 這兩天組內的小夥伴在學習Java8,推廣在新專案內使用新特性。正好看到了Stream 帶來的遍歷的多執行緒併發: Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9, 8, 0, 1)
PCB 加投率計算實現基本原理--K最近鄰算法(KNN)
最近鄰 plist 控制 str 驗收 階段 分享圖片 數據量 出現 PCB行業中,客戶訂購5000pcs,在投料時不會直接投5000pcs,因為實際在生產過程不可避免的造成PCB報廢, 所以在生產前需計劃多投一定比例的板板, 例:訂單 量是5000pcs,加投3%,那就
CUDA實現矩陣相加的平行計算
(一)目的 熟悉基本的CUDA程式架構以及如何呼叫相應的API進行CUDA程式設計 (二)內容 完成矩陣相加的並行程式的實現(不用share memory實現) 要求: 實現2個矩陣(32*32)的相加,M矩陣的初始值全為2,N矩陣的初始值全為5。同時用C
利用CountDownLatch實現平行計算
import java.util.concurrent.CountDownLatch; /** * @Author pipi * @Date 2018/10/15 13:56 **/ public class ParallelComputing { private int[] nums;
利用java8 stream api 實現List集合分頁獲取工具
package com.test.paging; import java.util.Arrays; import java.util.List; /** * @author kevin.chen * Date 2017/11/9 * Time 18:05
Java8之ScheduledThreadPoolExecutor實現原理
ScheduledThreadPoolExecutor是一個可實現定時任務的執行緒池,ScheduledThreadPoolExecutor內的任務既可以在設定的時間到達時執行一次,也可以相隔固定時間週期執行。 ScheduledThreadPoolExecutor繼承自T
java8 Stream 一行實現列表去重
通常情況下,實現列表去重的方式有: 建立一個新的result列表,迴圈原列表,如果元素不在result列表中,則放入 建立一個HashSet,將列表作為構造引數傳入 下面介紹一種簡單、高效的去重方式,藉助於java8新特性引入的Stream類。程式碼如下: 實體類Per
hashmap實現原理(雜湊值計算,put方法,擴容) jdk1.8帶來的優化 hashmap併發安全 ConcurrentHashMap
HashMap的原始碼,實現原理,JDK8中對HashMap做了怎樣的優化。 ArrayList和LinkedList的優缺點——陣列的特點是:定址容易,插入和刪除困難;而連結串列的特點是:定址困難,插入和刪除容易。 hashmap底層
23. matlab平行計算原理以及parpool函式
宣告: 因電腦問題, 只做記錄,以後嘗試。 出處: Matlab並行運算 - wenyusuran的專欄 - CSDN部落格 https://blog.csdn.net/wenyusuran/article/details/28901727
雲端計算學習筆記002---雲端計算的理解及介紹,google雲端計算平臺實現原理
什麼是雲端計算: l 說的明白一點: • 雲端計算其實就更大限度的發揮網路的資源。 • 那為什麼叫雲,為什麼不到網際網路計算?Cloud l 大多數計算的網路拓撲圖都用一塊“雲”來表示網際網路。於是就形成了雲端計算的說法 l 狹義:是指IT基礎設施的交付和
並行程式設計報告(MPI平行計算π,實現mandelbrot集)
一.熟悉MPI並行程式設計環境 1.硬體 電腦:HP暗夜精靈 記憶體:4G 處理器:ntel® Core™ i5-6300HQ CPU @ 2.30GHz × 4 顯示卡:NVIDIA 960M 2.軟體 系統:Ubuntu 16.04
平行計算作業補充(Python實現)
前情提要 之前由於平行計算的作業被視為類同或抄襲網路,正好最近學習Python,於是剛好在這裡作一個補充,新方法採用Python實現並行,一來學習用,二來為了完成作業。 Python平行計算 Python在平行計算方面使用的是GIL(Global Interp
Java 8-stream實現原理分析(一)
背景介紹 Java 8中引入了lambda和stream,極大的簡化了程式碼編寫工作,但是簡單的程式碼為何能實現如何豐富的功能,其背後又是如何實現的呢? Spliterator和Iterator Iterator Iterator是Ja
轉置矩陣的分塊並行乘法(C語言實現),計算矩陣C[rawn][rawn]=A[rawm][rawn]'*B[rawm][rawn],子塊大小為S*T,其演算法實現原理參加本程式碼的附件。
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <math.h> #define rawm 4 #define rawn 4 #defi
巧用zookeeper實現分散式平行計算
雲端計算的技術話題中少不了“分散式”,“平行計算” 這些個關鍵詞,我們知道硬體擴充套件的條件(Scale-up)始終是有限制的,將計算分散到網路中更多機器的CPU上提供更高的計算效能(Scale-out),並在這基礎上能將計算同時進行,那麼總體計算瓶頸會減小,計算的效能會
python--利用concurrent.futures 來實現真正的平行計算
由於python的全域性解釋鎖(GIL)使得我們無法使用執行緒進行真正的平行計算,因此,我們把總計算量分配到多個獨立的任務中,並在多個CPU和欣賞同時執行任務是很難實現的。 為解決該類問題,提高程式碼執行速率,我們嘗試可以引入concurrent.futures
win7之後的系統的CPU佔用計算的原理與實現
經過比對,發現procexp和工作管理員在計算程序cpu佔用上面存在很大的差異,經過研究發現,procexp顯示的是正確的,而工作管理員顯示的是錯誤的,工作管理員是用以前老的方式計算的。 新的cpu計算原理應該是: 程序CPU佔用率 =
平行計算--Java--求π並行實現
publicvoid run() { int i; step=1.0/(double)num_steps_wy; for(i=start_wy;i<num_steps_wy;i+=2) { x=(i+0.5)*st
Linux虛擬機器中配置多節點MPI實現平行計算完整版
虛擬機器CentOS中配置MPI多節點平行計算完整版,這是一個新手教程,從一個空白的CentOS到能正常執行的基礎環境,儘量詳細寫了每個步驟 我的環境如下: 宿主機:Windows 10 Pro 虛擬機器:VMware 12 + CentOS 6.6 一、 虛擬機器網路配置
Scala中Stream的應用場景及其實現原理
假設一個場景 需要在50個隨機數中找到前兩個可以被3整除的數字。 聽起來很簡單,我們可以這樣來寫: 1 2 3 4 5 6 7 8 9 def randomList = (1 to 50).map(_ =>