並行流parallel,CompletableFuture與Executors執行緒池的使用與區別
阿新 • • 發佈:2018-12-30
list並行操作在專案開發可以極大提高程式碼效率與效能,java8對其進行了很好的封裝,簡單使用研究一下:
1. 先自己建立一個list:
// list在實際使用中要注意執行緒安全,Collections.synchronizedList寫操作效能高,CopyOnWriteArrayList讀操作效能較好
List<String> list = Arrays.asList(new String[10000]);
2. parallel並行流使用:
list.stream().parallel().forEach(a -> {
// 操作程式碼.....
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
並行流特點:
基於伺服器核心的限制,如果你是八核,每次執行緒只能起八個,不能自定義執行緒池;
適用於對list密集計算操作充分利用CPU資源,如果需要呼叫遠端服務不建議使用;
3. CompletableFuture使用
3.1 未使用自定義執行緒池:
// supplyAsync需要有返回值,runAsync不需要有返回值
list.stream() .map(a -> CompletableFuture.supplyAsync(() -> {
// 操作程式碼.....
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return a;
})).collect(Collectors.toList()).stream().map (CompletableFuture::join).collect(Collectors.toList());
劃重點:
未自定義執行緒池時預設執行緒池跟並行流一樣,都是根據伺服器核心數建立執行緒數量。
3.2 使用自定義執行緒池:
ExecutorService executor = Executors.newFixedThreadPool(Math.min(list.size(), 100));
list.stream().map(a -> CompletableFuture.supplyAsync(() -> {
// 操作程式碼.....
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return a;
}, executor)).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
補充:
1. 執行緒數量的計算公式:
T(執行緒數) = N(伺服器核心數) * u(期望cpu利用率) * (1 + E(等待時間)/C(計算時間));
2. 獲取伺服器核心數:
int count = Runtime.getRuntime().availableProcessors();
3.劃重點:
此處join方法和CompletableFuture的get()方法類似,都是阻塞執行緒,等待結果,但是join方法不拋異常,不需要處理異常,讓你程式碼更方便,get方法拋異常。
4. Executors使用(有多種執行緒池)
list.forEach(a ->
executor.submit(() -> {
// 操作程式碼.....
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
})
);
executor.shutdown();
while (true) {
if (executor.isTerminated()) {
System.out.println("執行緒執行完畢!!!");
break;
}
Thread.sleep(10);
}
5. 簡單總結:
可以將程式碼貼上到idea中執行把執行時間打出來看看效果,最後發現:
- parallel與未定義自執行緒池的CompletableFuture效果差別不大,原因是底層都使用的預設的執行緒池;
- CompletableFuture自定義執行緒池與Executors的執行效果差別不大,但CompletableFuture有很多組合式的非同步程式設計方法:
runAsync:非同步執行沒有返回值;
supplyAsync:非同步執行有返回值;
thenApply:繼續執行當前執行緒future完成的函式,不需要阻塞等待其處理完成;
thenApplyAsync:在不同執行緒池非同步地應用引數中的函式;
thenCompose:用於多個彼此依賴的futrue進行串聯起來
thenCombine:並聯起兩個獨立的future,注意,這些future都是在長時間計算都完成以後