關於多執行緒計算結果同步
阿新 • • 發佈:2019-12-31
前言
日常搬磚,接到了將序列呼叫RPC服務(不要問我為啥序列 我也不明白)的業務邏輯優化的活兒,經過苦心思考,問了周邊大神同學,醍醐灌頂,有了此文。
多執行緒獲取計算結果方式
一想到多執行緒查詢資料再彙總,我想到的是以下三種方式,根據業務及效能選擇最優解。
Future同步式
- Future同步式,想必大家都很清楚,就是
get
方法,原始碼註釋如下。
Waits if necessary for the computation to complete,and then retrieves its result. 釋義:Future.get()線上程計算結束前,一直處於等待狀態。
程式碼
public static void main(String[] args) throws InterruptedException,ExecutionException {
long l = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> future = executorService.submit(() -> {
System.out.println("執行耗時操作..." );
timeConsumeOp();
// 耗時3000ms
return 100;
});
Future<Integer> future1 = executorService.submit(() -> {
System.out.println("執行耗時操作1...");
// 耗時2000ms
timeConsumeOp1();
return 101;
});
// 依次將future、future1新增進列表
List<Future<Integer>> futureList = Lists.newArrayList(future,future1);
int s = 0;
for (Future<Integer> future2 : futureList) {
// 執行緒計算結果
System.out.println("返回結果:" + future2.get());
s = s + future2.get();
}
// 求和
System.out.println("計算結果:" + s);
// 主執行緒耗時
System.out.println("主執行緒耗時:" + (System.currentTimeMillis() - l));
}
private static void timeConsumeOp() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void timeConsumeOp1() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
複製程式碼
- 結果
執行耗時操作...
執行耗時操作1...
返回結果:100
返回結果:101
計算結果:201
主執行緒耗時:3077
複製程式碼
解析
-
呼叫是同步的,從主執行緒耗時可以看出,主執行緒處於等待狀態,等待計算結果返回才開始執行,因此耗時為3000ms左右。
-
獲取計算結果取決於呼叫順序,儘管
future1
計算時間比future
短(2000ms < 3000ms),應該是先得到101再得到100,但是Future.get()
方法取決於呼叫順序,因為遍歷futureList
時先呼叫的future
的get
方法。 -
我們將
futureList
元素順序換下,看看結果。
// 先future1
List<Future<Integer>> futureList = Lists.newArrayList(future1,future);
複製程式碼
- 結果
執行耗時操作...
執行耗時操作1...
返回結果:101
返回結果:100
計算結果:201
主執行緒耗時:3076
複製程式碼
CompletionService
-
CompletionService
使用excutor
實現任務執行,添加了LinkedBlockingQueue
將完成的FutureTask
新增進來。
// 使用執行緒數為2的執行緒池初始化
private static CompletionService<Integer> completionService = new ExecutorCompletionService<> (Executors.newFixedThreadPool(2));
複製程式碼
程式碼
public static void main(String[] args) throws InterruptedException,ExecutionException {
long l = System.currentTimeMillis();
Future<Integer> future = completionService.submit(() -> {
System.out.println("執行耗時操作...");
timeConsumeOp();
return 100;
});
Future<Integer> future1 = completionService.submit(() -> {
System.out.println("執行耗時操作1...");
timeConsumeOp1();
return 101;
});
int s = 0;
for (int i = 0; i < 2; i++) {
int result = completionService.take().get();
System.out.println("返回結果:" + result);
s += result;
}
System.out.println("計算結果:" + s);
System.out.println("主執行緒耗時:" + (System.currentTimeMillis() - l));
}
private static void timeConsumeOp() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void timeConsumeOp1() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
複製程式碼
- 結果
執行耗時操作...
執行耗時操作1...
返回結果:101
返回結果:100
計算結果:201
主執行緒耗時:3066
複製程式碼
解析
- 呼叫是同步的,從主執行緒耗時可以看出,主執行緒處於等待狀態,等待計算結果返回才開始執行,因此耗時為3000ms左右。
-
獲取計算結果取決於計算完成順序,儘管
future1
計算時間比future
短(2000ms < 3000ms),因此得到101再得到100。
Future回撥式 + CountDownLatch
Future回撥式
- 實現Future回撥式的方式有多種,
Guava
就提供了兩種方式,ListeningExecutorService
、CompletableFuture
,也可以使用Netty
的Future
。 - 理解回撥式,Android開發的經驗就有用武之地了,其實就是非同步呼叫,像Android裡主執行緒負責渲染,訪問網路的執行緒得到結果了回撥觸發主執行緒修改UI。
- 本文使用
CompletableFuture
實現,有興趣的可以使用其他方式。
CountDownLatch(CycleBarrier)
- CountDownLatch類似於“計數器”,
countdown()
方法會將“計數器”減一,呼叫的await()
的執行緒會在“計數器”為0之前,處於等待狀態。
結果儲存
- 回撥式的特點在於計算結果是在工作執行緒中返回的,因此要將結果同步下,可以使用
concurrent
集合,或者使用類似Map<AtomicInteger,Object>
資料結果將各個子執行緒的結果彙總下。
程式碼
public static void main(String[] args) throws Exception{
CountDownLatch countDownLatch = new CountDownLatch(2);
List<Integer> list = Collections.synchronizedList(new ArrayList<>(2));
long l = System.currentTimeMillis();
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("執行耗時操作...");
timeConsumeOp();
return 100;
});
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
System.out.println("執行耗時操作1...");
timeConsumeOp1();
return 101;
}
});
completableFuture.whenComplete((integer,throwable) -> {
if (throwable != null) {
System.out.println("執行錯誤");
} else {
System.out.println("計算結果:" + integer + " 執行緒名:" + Thread.currentThread().getName());
list.add(integer);
}
countDownLatch.countDown();
});
completableFuture1.whenComplete((integer,throwable) -> {
if (throwable != null) {
System.out.println("執行錯誤");
} else {
System.out.println("計算結果:" + integer + " 執行緒名:" + Thread.currentThread().getName());
list.add(integer);
}
countDownLatch.countDown();
});
System.out.println("計算結果彙總前 主執行緒還在執行:" + (System.currentTimeMillis() - l));
// 主執行緒等待
countDownLatch.await();
int s = 0;
for (int i : list) {
s += i;
}
System.out.println("計算結果為:" + s + " 耗時 " + (System.currentTimeMillis() - l));
}
複製程式碼
- 結果
執行耗時操作...
執行耗時操作1...
計算結果彙總前 主執行緒還在執行:70
計算結果:101 執行緒名:ForkJoinPool.commonPool-worker-2
計算結果:100 執行緒名:ForkJoinPool.commonPool-worker-1
計算結果為:201 耗時 3072
複製程式碼
解析
- 回撥式,即非同步呼叫不會使主執行緒處於等待狀態。
-
countDownLatch.countDown()
的時機要在計算結果儲存之後,否則可能會漏掉執行緒執行結果。 -
countDownLatch.await()
主執行緒會等待所有的執行緒countDown
後,開始執行,可以看到耗時。 - 使用了
Collections.synchronizedList
,原因為結果在不同執行緒返回,加鎖保證計算結果儲存。
結語
本文綜合了幾種筆者能想到的方案,並進行了簡要分析,不見得都對,大家有更好的方案,可以與筆者交流。