CompletableFutures多執行緒阻塞獲取結果
阿新 • • 發佈:2021-08-12
voidCompletableFuture.get();
voidCompletableFuture.join();
一樣會阻塞當前執行緒,直到所有子任務都完成一起列印結果
package com.async; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; /** * @author Allen * @version 1.0 * @date 2021/8/12 20:23 * @功能說明 */ public class AsyncTest { public static void main(String[] args) { System.out.println(); long l = System.currentTimeMillis(); // 任務列表 List<Integer> jobList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14); int nThreads = 3; // 工作執行緒池 ExecutorService executorService = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); // 接收每個任務的執行結果 List<CompletableFuture<String>> completableFutures = new ArrayList<>(jobList.size()); // 任務分派 for (int i = 0; i < jobList.size(); i++) { try { Integer jobId = jobList.get(i); CompletableFuture<String> integerCompletableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 睡幾秒 return jobId + "任務執行結果"; }, executorService); // 儲存本次任務執行結果 completableFutures.add(integerCompletableFuture); } catch (Exception e) { } } // 等待全部任務執行完畢 CompletableFuture[] completableFutures1 = new CompletableFuture[completableFutures.size()]; CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures.toArray(completableFutures1)); try { voidCompletableFuture.get(); //voidCompletableFuture.join(); completableFutures.stream() .forEach(tmp -> { try { System.out.println(tmp.get()); } catch (Exception e) { e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } executorService.shutdown(); System.out.println(System.currentTimeMillis()-l); } }
非同步多執行緒配置
這裡面可以新增@Bean註解返回的多執行緒 預設是使用getAsyncExecutor方法內的執行緒池
@EnableAsync @Configuration public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setThreadNamePrefix("Anno-Executor"); //指定用於新建立的執行緒名稱的字首。 executor.setCorePoolSize(10); //核心執行緒數 executor.setMaxPoolSize(20); //最大執行緒數 executor.setQueueCapacity(1000); //佇列大小 executor.setKeepAliveSeconds(300); //執行緒最大空閒時間 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒絕策略(一共四種,此處省略) executor.initialize(); return executor; } // 異常處理器:當然你也可以自定義的,這裡我就這麼簡單寫了~~~ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new SimpleAsyncUncaughtExceptionHandler(); } }
比如如下 如果需要指定對應其他的執行緒池@Async("taskExecutor") +異常處理方法
@Configuration @EnableAsync @EnableScheduling public class AsyncConfiguration implements AsyncConfigurer { private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class); @Override @Bean(name = "taskExecutor") public Executor getAsyncExecutor() { log.debug("Creating Async Task Executor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); //核心執行緒數 executor.setMaxPoolSize(20); //最大執行緒數 executor.setQueueCapacity(1000); //佇列大小 executor.setKeepAliveSeconds(300); //執行緒最大空閒時間 executor.setThreadNamePrefix("ics-Executor-"); 指定用於新建立的執行緒名稱的字首。 executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy()); // 拒絕策略 return new ExceptionHandlingAsyncTaskExecutor(executor); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new MyAsyncExceptionHandler(); } class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable throwable, Method method, Object... objects) { log.info("Exception message - " + throwable.getMessage()); log.info("Method name - " + method.getName()); for (Object param : objects) { log.info("Parameter value - " + param); } } } }
開發程式碼例子
/**
* 每天晚上10點持行一次
*/
@RequestMapping("/getUrlsByDate")
public void getUrlsByDate(String date) {
log.info("指定日期獲取檔案");
String toDay = date;
File file = new File(fileAddressPrefix + toDay);
log.info("檔案轉換:" + (fileAddressPrefix + toDay));
File[] tempList = file.listFiles();
if (tempList != null) {
log.info("檔案轉換檔案個數:" + (tempList.length));
int onece = 4;
int time = tempList.length % onece == 0 ? tempList.length / onece : tempList.length / onece + 1;
final CountDownLatch countDownLatch = new CountDownLatch(time);
ArrayList<Future> futures = new ArrayList<>();
//Semaphore semaphore = new Semaphore(5);
for (int taskTime = 0; taskTime < time; taskTime++) {
int start = taskTime * onece;
int end = start + onece<tempList.length?start + onece:tempList.length;
threadPoolTaskExecutor.submit(() -> {
try {
Future<String> stringFuture = addFileBatchService.insertBatch(tempList, start,end, toDay);
futures.add(stringFuture);
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
String s="";
try {
countDownLatch.await();
System.out.println("----------------全部完成");
for (Future future : futures) {
//log.info(toDay + "執行結束"+future.get());
s+=future.get();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(s);
}
}
countDownLatch沒有用 呼叫非同步方法直接走到finally
可以改用這樣寫法 指定對應的執行緒池 再加阻塞獲取全部結果的
voidCompletableFuture.get();/ voidCompletableFuture.join();
CompletableFuture<String> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 睡幾秒
return jobId + "任務執行結果";
}, executorService);
@Override
@Async
@Transactional(rollbackFor = Exception.class)//事務
public CompletableFuture<String> insertBatch(File[] tempList, int start, int end, String toDay) throws Exception {
Date now = new Date();
long startTime = System.currentTimeMillis();
ArrayList<TRuVoiceFile> tRuVoiceFiles = new ArrayList<>();
//檔案拷貝
for (int i = start; i < end; i++) {
if (tempList[i].isFile()) {
//把當前檔案轉化成8bit備份到新資料夾
try{
transFile(tempList[i], fileAddressPrefix + "trans" + File.separator + toDay + File.separator + tempList[i].getName().replace("wav", "mp3"));
TRuVoiceFile entity = new TRuVoiceFile();
//sid,file_name,url,create_time,update_time,version,status
entity.setSid(SnowFlakeUtil.getId());
//通過fileName訪問的是轉換後文件地址目錄 /opt/ucc/data/imageserver/voiceFile/ trans/ yyyy-MM-dd / fileName.mp3
entity.setFileName(tempList[i].getName().replace("wav", "mp3"));
entity.setUrl(httpFileAddress + toDay + File.separator + tempList[i].getName());
entity.setCreateTime(now);
entity.setUpdateTime(now);
entity.setVersion(1);
entity.setStatus("initial");
tRuVoiceFiles.add(entity);
}catch (FileAlreadyExistsException e){
continue;
}
}
}
//批量插入
tRuVoiceFileService.addUpateBatch(tRuVoiceFiles, now);
long endTime = System.currentTimeMillis();
return CompletableFuture.completedFuture( "完成任務"+Thread.currentThread()+"用時:"+String.valueOf(endTime-startTime)+"ms");
}
on duplicate key update mybatis寫法設定的file_name為唯一索引,如果file_name有衝突則更新
version = version+1,指定是讓資料庫的值+1 如果寫成 version = value(version)+1,則是insert中的值+1
<insert id="insertBatch" parameterType="java.util.List">
insert into t_ru_voice_file
(sid,file_name,url,create_time,version,status)
values
<foreach collection="list" item="item" separator=",">
(#{item.sid},#{item.fileName},#{item.url},#{item.createTime},#{item.version},#{item.status})
</foreach>
on duplicate key update
version = version+1,
update_time = now()
</insert>