1. 程式人生 > 其它 >CompletableFutures多執行緒阻塞獲取結果

CompletableFutures多執行緒阻塞獲取結果

voidCompletableFuture.get();
voidCompletableFuture.join();

一樣會阻塞當前執行緒,直到所有子任務都完成一起列印結果

package com.async;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author Allen
 * @version 1.0
 * @date 2021/8/12 20:23
 * @功能說明
 */
public class AsyncTest {
    public static void main(String[] args) {
        System.out.println();
        long l = System.currentTimeMillis();
//        任務列表
        List<Integer> jobList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
        int nThreads = 3;
//        工作執行緒池
        ExecutorService executorService = new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
//        接收每個任務的執行結果
        List<CompletableFuture<String>> completableFutures = new ArrayList<>(jobList.size());

//        任務分派
        for (int i = 0; i < jobList.size(); i++) {

            try {
                Integer jobId = jobList.get(i);

                CompletableFuture<String> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
//                    睡幾秒
                    return jobId + "任務執行結果";
                }, executorService);
//            儲存本次任務執行結果
                completableFutures.add(integerCompletableFuture);
            } catch (Exception e) {
            }

        }


//        等待全部任務執行完畢
        CompletableFuture[] completableFutures1 = new CompletableFuture[completableFutures.size()];
        CompletableFuture<Void> voidCompletableFuture =
                CompletableFuture.allOf(completableFutures.toArray(completableFutures1));
        try {
            
            voidCompletableFuture.get();
            //voidCompletableFuture.join();
            completableFutures.stream()
                    .forEach(tmp -> {
                        try {
                            System.out.println(tmp.get());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
        } catch (Exception e) {
            e.printStackTrace();
        }

        executorService.shutdown();
        System.out.println(System.currentTimeMillis()-l);
    }
}

非同步多執行緒配置

這裡面可以新增@Bean註解返回的多執行緒 預設是使用getAsyncExecutor方法內的執行緒池

@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("Anno-Executor"); //指定用於新建立的執行緒名稱的字首。
        executor.setCorePoolSize(10); //核心執行緒數
        executor.setMaxPoolSize(20);  //最大執行緒數
        executor.setQueueCapacity(1000); //佇列大小
        executor.setKeepAliveSeconds(300); //執行緒最大空閒時間
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒絕策略(一共四種,此處省略)

        executor.initialize();
        return executor;
    }

    // 異常處理器:當然你也可以自定義的,這裡我就這麼簡單寫了~~~
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

比如如下 如果需要指定對應其他的執行緒池@Async("taskExecutor") +異常處理方法

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {
private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);
 
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
    log.debug("Creating Async Task Executor");
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10); //核心執行緒數
    executor.setMaxPoolSize(20);  //最大執行緒數
    executor.setQueueCapacity(1000); //佇列大小
    executor.setKeepAliveSeconds(300); //執行緒最大空閒時間 
    executor.setThreadNamePrefix("ics-Executor-"); 指定用於新建立的執行緒名稱的字首。
    executor.setRejectedExecutionHandler(
	new ThreadPoolExecutor.CallerRunsPolicy()); // 拒絕策略
    return new ExceptionHandlingAsyncTaskExecutor(executor);
}
 
@Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncExceptionHandler();
    }
 
    class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
 
        @Override
        public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
            log.info("Exception message - " + throwable.getMessage());
            log.info("Method name - " + method.getName());
            for (Object param : objects) {
                log.info("Parameter value - " + param);
            }
        }
    }
}

開發程式碼例子


/**
 * 每天晚上10點持行一次
 */
@RequestMapping("/getUrlsByDate")
public void getUrlsByDate(String date) {
    log.info("指定日期獲取檔案");
    String toDay = date;
    File file = new File(fileAddressPrefix + toDay);
    log.info("檔案轉換:" + (fileAddressPrefix + toDay));
    File[] tempList = file.listFiles();
    if (tempList != null) {
        log.info("檔案轉換檔案個數:" + (tempList.length));
        int onece = 4;
        int time = tempList.length % onece == 0 ? tempList.length / onece : tempList.length / onece + 1;
        final CountDownLatch countDownLatch = new CountDownLatch(time);
        ArrayList<Future> futures = new ArrayList<>();
        //Semaphore semaphore = new Semaphore(5);
        for (int taskTime = 0; taskTime < time; taskTime++) {
            int start = taskTime * onece;
            int end = start + onece<tempList.length?start + onece:tempList.length;
            threadPoolTaskExecutor.submit(() -> {
                try {
                    Future<String> stringFuture = addFileBatchService.insertBatch(tempList, start,end, toDay);
                    futures.add(stringFuture);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        String s="";
        try {
            countDownLatch.await();
            System.out.println("----------------全部完成");
            for (Future future : futures) {
                //log.info(toDay + "執行結束"+future.get());
                s+=future.get();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(s);
    }
}

countDownLatch沒有用 呼叫非同步方法直接走到finally

可以改用這樣寫法 指定對應的執行緒池 再加阻塞獲取全部結果的

voidCompletableFuture.get();/ voidCompletableFuture.join();

        CompletableFuture<String> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //                    睡幾秒
                return jobId + "任務執行結果";
            }, executorService);
@Override
@Async
@Transactional(rollbackFor = Exception.class)//事務
public CompletableFuture<String> insertBatch(File[] tempList, int start, int end, String toDay) throws Exception {
    Date now = new Date();
    long startTime = System.currentTimeMillis();
    ArrayList<TRuVoiceFile> tRuVoiceFiles = new ArrayList<>();
    //檔案拷貝
    for (int i = start; i < end; i++) {
        if (tempList[i].isFile()) {
            //把當前檔案轉化成8bit備份到新資料夾
            try{
                transFile(tempList[i], fileAddressPrefix + "trans" + File.separator + toDay + File.separator + tempList[i].getName().replace("wav", "mp3"));
                TRuVoiceFile entity = new TRuVoiceFile();
                //sid,file_name,url,create_time,update_time,version,status
                entity.setSid(SnowFlakeUtil.getId());
                //通過fileName訪問的是轉換後文件地址目錄 /opt/ucc/data/imageserver/voiceFile/ trans/ yyyy-MM-dd / fileName.mp3
                entity.setFileName(tempList[i].getName().replace("wav", "mp3"));
                entity.setUrl(httpFileAddress + toDay + File.separator + tempList[i].getName());
                entity.setCreateTime(now);
                entity.setUpdateTime(now);
                entity.setVersion(1);
                entity.setStatus("initial");
                tRuVoiceFiles.add(entity);
            }catch (FileAlreadyExistsException e){
                continue;
            }

        }
    }
    //批量插入
    tRuVoiceFileService.addUpateBatch(tRuVoiceFiles, now);
    long endTime = System.currentTimeMillis();
    return CompletableFuture.completedFuture( "完成任務"+Thread.currentThread()+"用時:"+String.valueOf(endTime-startTime)+"ms");

}

on duplicate key update mybatis寫法設定的file_name為唯一索引,如果file_name有衝突則更新

version = version+1,指定是讓資料庫的值+1 如果寫成 version = value(version)+1,則是insert中的值+1

<insert id="insertBatch" parameterType="java.util.List">
        insert into t_ru_voice_file
        (sid,file_name,url,create_time,version,status)
        values
        <foreach collection="list" item="item" separator=",">
           (#{item.sid},#{item.fileName},#{item.url},#{item.createTime},#{item.version},#{item.status})
        </foreach>
        on duplicate key update
        version = version+1,
        update_time = now()
    </insert>