1. 程式人生 > 程式設計 >關於多執行緒計算結果同步

關於多執行緒計算結果同步

前言

日常搬磚,接到了將序列呼叫RPC服務(不要問我為啥序列 我也不明白)的業務邏輯優化的活兒,經過苦心思考,問了周邊大神同學,醍醐灌頂,有了此文。

多執行緒獲取計算結果方式

一想到多執行緒查詢資料再彙總,我想到的是以下三種方式,根據業務及效能選擇最優解。

Future同步式

  • Future同步式,想必大家都很清楚,就是get方法,原始碼註釋如下。

Waits if necessary for the computation to complete,and then retrieves its result. 釋義:Future.get()線上程計算結束前,一直處於等待狀態。

程式碼

public
static void main(String[] args) throws InterruptedException,ExecutionException
{ long l = System.currentTimeMillis(); ExecutorService executorService = Executors.newFixedThreadPool(2); Future<Integer> future = executorService.submit(() -> { System.out.println("執行耗時操作..."
); timeConsumeOp(); // 耗時3000ms return 100; }); Future<Integer> future1 = executorService.submit(() -> { System.out.println("執行耗時操作1..."); // 耗時2000ms timeConsumeOp1(); return 101; }); // 依次將future、future1新增進列表
List<Future<Integer>> futureList = Lists.newArrayList(future,future1); int s = 0; for (Future<Integer> future2 : futureList) { // 執行緒計算結果 System.out.println("返回結果:" + future2.get()); s = s + future2.get(); } // 求和 System.out.println("計算結果:" + s); // 主執行緒耗時 System.out.println("主執行緒耗時:" + (System.currentTimeMillis() - l)); } private static void timeConsumeOp() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } private static void timeConsumeOp1() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } 複製程式碼
  • 結果
執行耗時操作...
執行耗時操作1...
返回結果:100
返回結果:101
計算結果:201
主執行緒耗時:3077
複製程式碼

解析

  • 呼叫是同步的,從主執行緒耗時可以看出,主執行緒處於等待狀態,等待計算結果返回才開始執行,因此耗時為3000ms左右。

  • 獲取計算結果取決於呼叫順序,儘管future1計算時間比future短(2000ms < 3000ms),應該是先得到101再得到100,但是Future.get()方法取決於呼叫順序,因為遍歷futureList時先呼叫的futureget方法。

  • 我們將futureList元素順序換下,看看結果。

// 先future1
List<Future<Integer>> futureList = Lists.newArrayList(future1,future);
複製程式碼
  • 結果
執行耗時操作...
執行耗時操作1...
返回結果:101
返回結果:100
計算結果:201
主執行緒耗時:3076
複製程式碼

CompletionService

  • CompletionService使用excutor實現任務執行,添加了LinkedBlockingQueue將完成的FutureTask新增進來。
	// 使用執行緒數為2的執行緒池初始化
    private static CompletionService<Integer> completionService = new ExecutorCompletionService<> (Executors.newFixedThreadPool(2));
複製程式碼

程式碼

public static void main(String[] args) throws InterruptedException,ExecutionException {
        long l = System.currentTimeMillis();
        
        Future<Integer> future = completionService.submit(() -> {
            System.out.println("執行耗時操作...");
            timeConsumeOp();
            return 100;
        });

        Future<Integer> future1 = completionService.submit(() -> {
            System.out.println("執行耗時操作1...");
            timeConsumeOp1();
            return 101;
        });

        int s = 0;

        for (int i = 0; i < 2; i++) {
            int result = completionService.take().get();
            System.out.println("返回結果:" + result);
            s += result;
        }

        System.out.println("計算結果:" + s);
        System.out.println("主執行緒耗時:" + (System.currentTimeMillis() - l));
    }

    private static void timeConsumeOp() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void timeConsumeOp1() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
複製程式碼
  • 結果
執行耗時操作...
執行耗時操作1...
返回結果:101
返回結果:100
計算結果:201
主執行緒耗時:3066
複製程式碼

解析

  • 呼叫是同步的,從主執行緒耗時可以看出,主執行緒處於等待狀態,等待計算結果返回才開始執行,因此耗時為3000ms左右。
  • 獲取計算結果取決於計算完成順序,儘管future1計算時間比future短(2000ms < 3000ms),因此得到101再得到100。

Future回撥式 + CountDownLatch

Future回撥式

  • 實現Future回撥式的方式有多種,Guava就提供了兩種方式,ListeningExecutorServiceCompletableFuture,也可以使用NettyFuture
  • 理解回撥式,Android開發的經驗就有用武之地了,其實就是非同步呼叫,像Android裡主執行緒負責渲染,訪問網路的執行緒得到結果了回撥觸發主執行緒修改UI。
  • 本文使用CompletableFuture實現,有興趣的可以使用其他方式。

CountDownLatch(CycleBarrier)

  • CountDownLatch類似於“計數器”,countdown()方法會將“計數器”減一,呼叫的await()的執行緒會在“計數器”為0之前,處於等待狀態。

結果儲存

  • 回撥式的特點在於計算結果是在工作執行緒中返回的,因此要將結果同步下,可以使用concurrent集合,或者使用類似Map<AtomicInteger,Object>資料結果將各個子執行緒的結果彙總下。

程式碼

public static void main(String[] args) throws Exception{
        CountDownLatch countDownLatch = new CountDownLatch(2);

        List<Integer> list = Collections.synchronizedList(new ArrayList<>(2));
        long l = System.currentTimeMillis();
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("執行耗時操作...");
            timeConsumeOp();
            return 100;
        });
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                System.out.println("執行耗時操作1...");
                timeConsumeOp1();
                return 101;
            }
        });

        completableFuture.whenComplete((integer,throwable) -> {

            if (throwable != null) {
                System.out.println("執行錯誤");
            } else {
                System.out.println("計算結果:" + integer + " 執行緒名:" + Thread.currentThread().getName());
                list.add(integer);
            }
            countDownLatch.countDown();

        });

        completableFuture1.whenComplete((integer,throwable) -> {

            if (throwable != null) {
                System.out.println("執行錯誤");
            } else {
                System.out.println("計算結果:" + integer + " 執行緒名:" + Thread.currentThread().getName());
                list.add(integer);
            }
            countDownLatch.countDown();
        });


        System.out.println("計算結果彙總前 主執行緒還在執行:" + (System.currentTimeMillis() - l));
        // 主執行緒等待
        countDownLatch.await();

        int s = 0;
        for (int i : list) {
            s += i;
        }
        System.out.println("計算結果為:" + s + " 耗時 " + (System.currentTimeMillis() - l));
    }
複製程式碼
  • 結果
執行耗時操作...
執行耗時操作1...
計算結果彙總前 主執行緒還在執行:70
計算結果:101 執行緒名:ForkJoinPool.commonPool-worker-2
計算結果:100 執行緒名:ForkJoinPool.commonPool-worker-1
計算結果為:201 耗時 3072
複製程式碼

解析

  • 回撥式,即非同步呼叫不會使主執行緒處於等待狀態。
  • countDownLatch.countDown()的時機要在計算結果儲存之後,否則可能會漏掉執行緒執行結果。
  • countDownLatch.await()主執行緒會等待所有的執行緒countDown後,開始執行,可以看到耗時。
  • 使用了Collections.synchronizedList,原因為結果在不同執行緒返回,加鎖保證計算結果儲存。

結語

本文綜合了幾種筆者能想到的方案,並進行了簡要分析,不見得都對,大家有更好的方案,可以與筆者交流。

參考文獻

www.cnkirito.moe/future-and-…