多執行緒程式設計例項(使用CompletableFuture)
阿新 • • 發佈:2021-01-25
關鍵配置:
/**
* 代理類物件
*/
YrSyncWrService yrSyncWrServiceProxy;
// 獲取代理類的物件,呼叫本類方法時,註解才會生效。比如@Async多執行緒,@Transactional事務控制
yrSyncWrServiceProxy = SpringUtil.getBean(YrSyncWrServiceImpl.class);
// 建立指定多執行緒處理批次數大小陣列 CompletableFuture<Map<String, Integer>>[] completableFutureArr = new CompletableFuture[batchNums]; CompletableFuture<Map<String, Integer>> completableFuture; // 每次遍歷數量為 batchQueryNum 的記錄,分批進行多執行緒處理 for (int batchNum = 0; batchNum < batchNums; batchNum++) { // 如果非同步標識沒值或為false if (asyncFlag == null || !asyncFlag) { // 同步指定型別網元(沒有用代理類呼叫本類方法,則多執行緒不起作用,視為單執行緒處理) completableFuture = syncOneNetEleTypeByBatchNum(netEleClass, metaWrFieldMapList, syncNetEleType, batchNum, netElementNameList, lastUpdateTime); } else { // 按批次號同步指定型別網元(多執行緒非同步處理)//非同步標誌.true:非同步;false或NULL:同步. completableFuture = yrSyncWrServiceProxy.syncOneNetEleTypeByBatchNum(netEleClass, metaWrFieldMapList, syncNetEleType, batchNum, netElementNameList, lastUpdateTime); } completableFutureArr[batchNum] = completableFuture; } // 等待所有任務都執行完 CompletableFuture.allOf(completableFutureArr).join(); CompletableFuture<Map<String, Integer>> cf; // 統計成功條數 for (int i = 0; i < completableFutureArr.length; i++) { cf = completableFutureArr[i]; Map<String, Integer> resultMap = null; try { resultMap = cf.get(); } catch (InterruptedException e) { wrAlarmTipsLogService.generateAlarmTipsLog("wr_sync_finish_" + syncNetEleType + "_add,error", syncNetEleType, e.getMessage(), "luhuiping"); Thread.currentThread().interrupt();//捕獲到InterruptedException異常後恢復中斷狀態 log.info("sync_fail (600002) 綜資同步到無線資源,正在同步當前網元【{}】,異常錯誤資訊【{}】", syncNetEleType ,e.getMessage()); e.printStackTrace(); } catch (ExecutionException e) { log.info("sync_fail (600003) 綜資同步到無線資源,正在同步當前網元【{}】,異常錯誤資訊【{}】", syncNetEleType, e.getMessage()); e.printStackTrace(); } if (!ObjectUtils.isNullObj(resultMap) && !resultMap.isEmpty()) { succCnt += resultMap.get("succCnt"); // updateCnt += resultMap.get("updateCnt"); excludeCnt += resultMap.get("excludeCnt"); succFailCnt += resultMap.get("succFailCnt"); // updateFailCnt += resultMap.get("updateFailCnt"); } }
/** * 按批次號同步指定型別網元(多執行緒非同步處理) * * @param * @return java.util.concurrent.CompletableFuture<java.lang.String> * @author liangzhaolin * @date 2020/7/28 16:45 */ // @Async // @Async("syncOneNetEleTypeExecutor") @Override public CompletableFuture<Map<String, Integer>> syncOneNetEleTypeByBatchNum(Class<?> netEleClass, List<MetaWrFieldMap> metaWrFieldMapList, String netEleType, int batchNum, List<String> netElementNameList, String lastUpdateTime) { try { long startTime = System.currentTimeMillis(); // 開始行數 // int startRow = (batchNum * batchQueryNum) + 1; int startRow = (batchNum * BATCH_QUERY_NUM) + 1; // 結束行數 // int endRow = (batchNum + 1) * batchQueryNum; int endRow = (batchNum + 1) * BATCH_QUERY_NUM; // 處理結果 新增 int resultAdd = 0; //更新 int resultUpdate = 0; //不處理結果 int resultExclude = 0; // 新增成功行數 int resultAddFail = 0; //不處理條數 Station int resultUpdateFail = 0; // 排列順序.0:正序;1:倒序.預設0. int sortType = 0; List<Map<String, Object>> yrInfoMapList; // 獲取實體類名稱 String netEleClassName = netEleClass.getSimpleName(); // 獲取對應網元mapper mapper = (BaseMapper) SpringUtil.getBean(StringUtils.firstToLowerCase(netEleClassName) + "Mapper"); // 查詢指定行數區間的億陽綜資記錄, 指定同步資料 if (netElementNameList != null && netElementNameList.size() > 0) { yrInfoMapList = yyzzService.queryYrIncSyncInfo(netEleType, lastUpdateTime, sortType, startRow, endRow, netElementNameList, null); }else{ yrInfoMapList = yyzzService.queryYrIncSyncInfo(netEleType, lastUpdateTime, sortType, startRow, endRow, null, null); } if(yrInfoMapList == null || yrInfoMapList.size() <= 0){ log.info("sync_fail (600004) 綜資同步到無線資源,正在同步當前網元【{}】, 查詢綜資資料為【{}】", netEleType ,yrInfoMapList); } log.info("sync_info (500008) 綜資同步到無線資源,正在同步當前網元【{}】, 查詢綜資資料為【{}】", netEleType ,yrInfoMapList.toString()); // 遍歷每條億陽綜資查詢資料進行處理 for (Map<String, Object> yrInfoMap : yrInfoMapList) { log.info("sync_info (500009) 綜資同步到無線資源,正在同步當前網元【{}】, 查詢綜資遍歷資料【{}】", netEleType, yrInfoMap); try { //當前資料是否存在無線資源庫中,如果存在,綜資的最後修改時間是否大於無線資源的最後修改時間 //大於則修改 int saveOrExclude = wrSyncDateUpdateConfirm(yrInfoMap, netEleType); log.info("sync_info (500010) 綜資同步到無線資源,正在同步當前網元【{}】, 判斷新增或修改 1:需要修改 2:不操作 3:新增 4:不操作【{}】", netEleType, saveOrExclude); // //1:不操作 0:新增 if(saveOrExclude == 0){ //0:新增 並插入日誌 resultAdd = syncOneNetEleTypeWithTransaction(netEleClass, netEleType, metaWrFieldMapList, yrInfoMap, saveOrExclude); if(resultAdd == 0){ resultAddFail++; log.info("sync_fail (600006) 綜資同步到無線資源,正在同步當前網元【{}】, 資料同步新增失敗【{}】,新增失敗狀態【{}】", netEleType ,yrInfoMap ,saveOrExclude); }else if(resultAdd == 1){ //新增成功把綜資id儲存到map集合中 if("station".equals(netEleType)){ gatherStationMap.put(String.valueOf(yrInfoMap.get("空間資源ID")), String.valueOf(yrInfoMap.get("空間資源名稱"))); }if("room".equals(netEleType)){ gatherRoomMap.put(String.valueOf(yrInfoMap.get("空間資源ID")), String.valueOf(yrInfoMap.get("空間資源名稱"))); } log.info("sync_info (600601) 綜資同步到無線資源新增成功,正在同步當前網元【{}】,空間資源ID【{}】,空間資源名稱【{}】", netEleType, String.valueOf(yrInfoMap.get("空間資源ID")), String.valueOf(yrInfoMap.get("空間資源名稱"))); resultAdd++; } }else if(saveOrExclude == 1){ //不處理(不符合新增或修改要求) resultExclude++; } } catch (Exception e) { if (e instanceof DuplicateKeyException) { log.info("sync_fail (600007) 綜資同步到無線資源,正在同步當前網元【{}】,資料同步新增失敗-1【{}】,異常錯誤資訊【{}】", netEleType ,yrInfoMap ,e.getMessage()); // do nothing // 如果入庫報錯(目前估計可能引起入庫報錯的情況只有,重跑程式時,插入wr_yr_sync_log日誌表時會有衝突而引起的報錯回滾),do nothing,keep running } else { log.info("sync_fail (600008) 綜資同步到無線資源,正在同步當前網元【{}】,資料同步新增失敗-2【{}】,異常錯誤資訊【{}】", netEleType, yrInfoMap, e.getMessage()); } } } Map<String, Integer> cfMap = new HashMap<>(); cfMap.put("succCnt", resultAdd);//處理條數 cfMap.put("excludeCnt", resultExclude);//不處理條數 cfMap.put("succFailCnt", resultAddFail);//更新失敗條數 // 返回本批處理的記錄成功行數和總行數 CompletableFuture<Map<String, Integer>> completableFuture = CompletableFuture.completedFuture(cfMap); long endTime = System.currentTimeMillis(); log.info("sync_into_test (1100001) 綜資同步到無線資源,當前網元同步網元【{}】,同步5000條耗時【{}】毫秒," + "查詢分頁開始【{}】,查詢分頁結束【{}】,同步時間範圍【{}】", netEleType, endTime - startTime, startRow, endRow, lastUpdateTime); return completableFuture; } catch (Exception e) { log.info("sync_fail (6100009) 綜資同步到無線資源,正在同步當前網元【{}】,異常錯誤資訊【{}】", netEleType, e.getMessage()); e.printStackTrace(); } return null; }
@Configuration @Slf4j public class ThreadExecutorConfig { /** * 配置億陽綜資同步無線資源介面執行緒池 * * @param * @return java.util.concurrent.Executor * @author liangzhaolin * @date 2020/8/2 17:31 */ @Bean("yrSyncWrExecutor") public Executor yrSyncWrExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心執行緒數:執行緒池建立時候初始化的執行緒數 executor.setCorePoolSize(30); // 最大執行緒數:執行緒池最大的執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒 executor.setMaxPoolSize(300); // 緩衝佇列:用來緩衝執行任務的佇列 executor.setQueueCapacity(600); // 允許執行緒的空閒時間60秒:當超過了核心執行緒之外的執行緒在空閒時間到達之後會被銷燬 executor.setKeepAliveSeconds(60); // 執行緒池名的字首:設定好了之後可以方便我們定位處理任務所在的執行緒池 executor.setThreadNamePrefix("YrSyncWrThread-"); // 緩衝佇列滿了之後的拒絕策略:由呼叫執行緒處理(一般是主執行緒),這裡是DiscardPolicy丟棄策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.initialize(); return executor; } /** * 配置實時無線同步綜資介面執行緒池 * * @param * @return java.util.concurrent.Executor * @author linlianghong * @date 2020/11/13 */ @Bean("realYrSyncWrExecutor") public Executor realYrSyncWrExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心執行緒數:執行緒池建立時候初始化的執行緒數 executor.setCorePoolSize(10); // 最大執行緒數:執行緒池最大的執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒 executor.setMaxPoolSize(15); // 緩衝佇列:用來緩衝執行任務的佇列 executor.setQueueCapacity(600); // 允許執行緒的空閒時間60秒:當超過了核心執行緒之外的執行緒在空閒時間到達之後會被銷燬 executor.setKeepAliveSeconds(60); // 執行緒池名的字首:設定好了之後可以方便我們定位處理任務所在的執行緒池 executor.setThreadNamePrefix("RealYrSyncWrThread-"); // 緩衝佇列滿了之後的拒絕策略:由呼叫執行緒處理(一般是主執行緒),這裡是DiscardPolicy丟棄策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.initialize(); return executor; } /** * 配置執行緒池 * ThreadPoolTaskExecutor的處理流程:當池子大小小於corePoolSize,就新建執行緒,並處理請求 * 當池子大小等於corePoolSize,把請求放入workQueue中,池子裡的空閒執行緒就去workQueue中取任務並處理 * 當workQueue放不下任務時,就新建執行緒入池,並處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理 * 當池子的執行緒數大於corePoolSize時,多餘的執行緒會等待keepAliveTime長時間,如果無請求可處理就自行銷燬 * 其會優先建立corePoolSize執行緒,當繼續增加執行緒時,先放入Queue中,當 CorePoolSiz 和 Queue 都滿的時候,就增加建立新執行緒,當執行緒達到MaxPoolSize的時候,就會丟擲錯 * 另外MaxPoolSize的設定如果比系統支援的執行緒數還要大時,會丟擲java.lang.OutOfMemoryError: unable to create new native thread 異常。 * * @author liangzhaolin * @date 2020/7/24 14:43 * @see "https://blog.csdn.net/csdn_pangxiong/article/details/103731613" * @see "https://www.jianshu.com/p/14bde4e6f747" */ @Bean("syncOneNetEleTypeExecutor") public Executor syncOneNetEleTypeExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心執行緒數:執行緒池建立時候初始化的執行緒數 executor.setCorePoolSize(30); // 最大執行緒數:執行緒池最大的執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒 executor.setMaxPoolSize(300); // 緩衝佇列:用來緩衝執行任務的佇列 executor.setQueueCapacity(600); // 允許執行緒的空閒時間60秒:當超過了核心執行緒之外的執行緒在空閒時間到達之後會被銷燬 executor.setKeepAliveSeconds(60); // 執行緒池名的字首:設定好了之後可以方便我們定位處理任務所在的執行緒池 executor.setThreadNamePrefix("SyncOneNetEleTypeThread-"); // 緩衝佇列滿了之後的拒絕策略:由呼叫執行緒處理(一般是主執行緒),這裡是DiscardPolicy丟棄策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.initialize(); return executor; } }