1. 程式人生 > 其它 >多執行緒程式設計例項(使用CompletableFuture)

多執行緒程式設計例項(使用CompletableFuture)

技術標籤:# 多執行緒多執行緒程式設計Future

關鍵配置:

    /**
     * 代理類物件
     */
    YrSyncWrService yrSyncWrServiceProxy;

// 獲取代理類的物件,呼叫本類方法時,註解才會生效。比如@Async多執行緒,@Transactional事務控制
yrSyncWrServiceProxy = SpringUtil.getBean(YrSyncWrServiceImpl.class);
                // 建立指定多執行緒處理批次數大小陣列
                CompletableFuture<Map<String, Integer>>[] completableFutureArr = new CompletableFuture[batchNums];

                CompletableFuture<Map<String, Integer>> completableFuture;

                // 每次遍歷數量為 batchQueryNum 的記錄,分批進行多執行緒處理
                for (int batchNum = 0; batchNum < batchNums; batchNum++) {
                    // 如果非同步標識沒值或為false
                    if (asyncFlag == null || !asyncFlag) {
                        // 同步指定型別網元(沒有用代理類呼叫本類方法,則多執行緒不起作用,視為單執行緒處理)
                        completableFuture = syncOneNetEleTypeByBatchNum(netEleClass, metaWrFieldMapList, syncNetEleType, batchNum, netElementNameList, lastUpdateTime);
                    } else {
                        // 按批次號同步指定型別網元(多執行緒非同步處理)//非同步標誌.true:非同步;false或NULL:同步.
                        completableFuture = yrSyncWrServiceProxy.syncOneNetEleTypeByBatchNum(netEleClass, metaWrFieldMapList, syncNetEleType, batchNum, netElementNameList, lastUpdateTime);
                    }
                    completableFutureArr[batchNum] = completableFuture;
                }

                // 等待所有任務都執行完
                CompletableFuture.allOf(completableFutureArr).join();

                CompletableFuture<Map<String, Integer>> cf;

                // 統計成功條數
                for (int i = 0; i < completableFutureArr.length; i++) {
                    cf = completableFutureArr[i];
                    Map<String, Integer> resultMap = null;
                    try {
                        resultMap = cf.get();
                    } catch (InterruptedException e) {
                        wrAlarmTipsLogService.generateAlarmTipsLog("wr_sync_finish_" + syncNetEleType + "_add,error", syncNetEleType, e.getMessage(), "luhuiping");
                        Thread.currentThread().interrupt();//捕獲到InterruptedException異常後恢復中斷狀態
                        log.info("sync_fail (600002) 綜資同步到無線資源,正在同步當前網元【{}】,異常錯誤資訊【{}】", 
                                syncNetEleType ,e.getMessage());
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        log.info("sync_fail (600003) 綜資同步到無線資源,正在同步當前網元【{}】,異常錯誤資訊【{}】", 
                                syncNetEleType, e.getMessage());
                        e.printStackTrace();
                    }

                    if (!ObjectUtils.isNullObj(resultMap) && !resultMap.isEmpty()) {
                        succCnt += resultMap.get("succCnt");
//                        updateCnt += resultMap.get("updateCnt");
                        excludeCnt += resultMap.get("excludeCnt");
                        succFailCnt += resultMap.get("succFailCnt");
//                        updateFailCnt += resultMap.get("updateFailCnt");
                    }
                }
/**
     * 按批次號同步指定型別網元(多執行緒非同步處理)
     *
     * @param
     * @return java.util.concurrent.CompletableFuture<java.lang.String>
     * @author liangzhaolin
     * @date 2020/7/28 16:45
     */
//    @Async
//    @Async("syncOneNetEleTypeExecutor")
    @Override
    public CompletableFuture<Map<String, Integer>> syncOneNetEleTypeByBatchNum(Class<?> netEleClass, List<MetaWrFieldMap> metaWrFieldMapList, String netEleType, int batchNum, List<String> netElementNameList, String lastUpdateTime) {

        try {
            long startTime = System.currentTimeMillis();

            // 開始行數
//        int startRow = (batchNum * batchQueryNum) + 1;
            int startRow = (batchNum * BATCH_QUERY_NUM) + 1;
            // 結束行數
//        int endRow = (batchNum + 1) * batchQueryNum;
            int endRow = (batchNum + 1) * BATCH_QUERY_NUM;
            // 處理結果 新增
            int resultAdd = 0;
            //更新
            int resultUpdate = 0;
            //不處理結果
            int resultExclude = 0;

            // 新增成功行數 
            int resultAddFail = 0;
            //不處理條數 Station
            int resultUpdateFail = 0;


            // 排列順序.0:正序;1:倒序.預設0.
            int sortType = 0;
            List<Map<String, Object>> yrInfoMapList;
            // 獲取實體類名稱
            String netEleClassName = netEleClass.getSimpleName();
            // 獲取對應網元mapper
            mapper = (BaseMapper) SpringUtil.getBean(StringUtils.firstToLowerCase(netEleClassName) + "Mapper");

            // 查詢指定行數區間的億陽綜資記錄, 指定同步資料
            if (netElementNameList != null && netElementNameList.size() > 0) {
                yrInfoMapList = yyzzService.queryYrIncSyncInfo(netEleType, lastUpdateTime, sortType, startRow, endRow, netElementNameList, null);
            }else{
                yrInfoMapList = yyzzService.queryYrIncSyncInfo(netEleType, lastUpdateTime, sortType, startRow, endRow, null, null);
            }

            if(yrInfoMapList == null || yrInfoMapList.size() <= 0){
                log.info("sync_fail (600004) 綜資同步到無線資源,正在同步當前網元【{}】, 查詢綜資資料為【{}】", 
                        netEleType ,yrInfoMapList);
            }
            log.info("sync_info (500008) 綜資同步到無線資源,正在同步當前網元【{}】, 查詢綜資資料為【{}】", 
                    netEleType ,yrInfoMapList.toString());
            // 遍歷每條億陽綜資查詢資料進行處理
            for (Map<String, Object> yrInfoMap : yrInfoMapList) {
                log.info("sync_info (500009) 綜資同步到無線資源,正在同步當前網元【{}】, 查詢綜資遍歷資料【{}】", netEleType, yrInfoMap);
                try {
                    //當前資料是否存在無線資源庫中,如果存在,綜資的最後修改時間是否大於無線資源的最後修改時間
                    //大於則修改
                    int saveOrExclude = wrSyncDateUpdateConfirm(yrInfoMap, netEleType);
                    log.info("sync_info (500010) 綜資同步到無線資源,正在同步當前網元【{}】, 判斷新增或修改 1:需要修改 2:不操作 3:新增 4:不操作【{}】", netEleType, saveOrExclude);
    //                //1:不操作  0:新增
                    if(saveOrExclude == 0){
                        //0:新增  並插入日誌
                        resultAdd = syncOneNetEleTypeWithTransaction(netEleClass, netEleType, metaWrFieldMapList, yrInfoMap, saveOrExclude);
                        if(resultAdd == 0){
                            resultAddFail++;
                            log.info("sync_fail (600006) 綜資同步到無線資源,正在同步當前網元【{}】, 資料同步新增失敗【{}】,新增失敗狀態【{}】", 
                                    netEleType ,yrInfoMap ,saveOrExclude);
                        }else if(resultAdd == 1){
                            //新增成功把綜資id儲存到map集合中
                            if("station".equals(netEleType)){
                                gatherStationMap.put(String.valueOf(yrInfoMap.get("空間資源ID")), String.valueOf(yrInfoMap.get("空間資源名稱")));
                            }if("room".equals(netEleType)){
                                gatherRoomMap.put(String.valueOf(yrInfoMap.get("空間資源ID")), String.valueOf(yrInfoMap.get("空間資源名稱")));
                            }
                            log.info("sync_info (600601) 綜資同步到無線資源新增成功,正在同步當前網元【{}】,空間資源ID【{}】,空間資源名稱【{}】",
                                    netEleType, String.valueOf(yrInfoMap.get("空間資源ID")), String.valueOf(yrInfoMap.get("空間資源名稱")));
                            resultAdd++;
                        }
                    }else if(saveOrExclude == 1){
                        //不處理(不符合新增或修改要求)
                        resultExclude++;
                    }
                } catch (Exception e) {
                    if (e instanceof DuplicateKeyException) {
                        log.info("sync_fail (600007) 綜資同步到無線資源,正在同步當前網元【{}】,資料同步新增失敗-1【{}】,異常錯誤資訊【{}】",
                                netEleType ,yrInfoMap ,e.getMessage());
                        // do nothing
                        // 如果入庫報錯(目前估計可能引起入庫報錯的情況只有,重跑程式時,插入wr_yr_sync_log日誌表時會有衝突而引起的報錯回滾),do nothing,keep running
                    } else {
                        log.info("sync_fail (600008) 綜資同步到無線資源,正在同步當前網元【{}】,資料同步新增失敗-2【{}】,異常錯誤資訊【{}】", 
                                netEleType, yrInfoMap, e.getMessage());
                    }
                }
            }

            Map<String, Integer> cfMap = new HashMap<>();
            cfMap.put("succCnt", resultAdd);//處理條數
            cfMap.put("excludeCnt", resultExclude);//不處理條數
            cfMap.put("succFailCnt", resultAddFail);//更新失敗條數

            // 返回本批處理的記錄成功行數和總行數
            CompletableFuture<Map<String, Integer>> completableFuture = CompletableFuture.completedFuture(cfMap);

            long endTime = System.currentTimeMillis();
    
            log.info("sync_into_test (1100001) 綜資同步到無線資源,當前網元同步網元【{}】,同步5000條耗時【{}】毫秒," +
                    "查詢分頁開始【{}】,查詢分頁結束【{}】,同步時間範圍【{}】", netEleType, endTime - startTime, 
                    startRow, endRow, lastUpdateTime);

            return completableFuture;
        } catch (Exception e) {
            log.info("sync_fail (6100009) 綜資同步到無線資源,正在同步當前網元【{}】,異常錯誤資訊【{}】",
                    netEleType, e.getMessage());
            e.printStackTrace();
        }
        
        return null;
    }
@Configuration
@Slf4j
public class ThreadExecutorConfig {

    /**
     * 配置億陽綜資同步無線資源介面執行緒池
     *
     * @param
     * @return java.util.concurrent.Executor
     * @author liangzhaolin
     * @date 2020/8/2 17:31
     */
    @Bean("yrSyncWrExecutor")
    public Executor yrSyncWrExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 核心執行緒數:執行緒池建立時候初始化的執行緒數
        executor.setCorePoolSize(30);
        // 最大執行緒數:執行緒池最大的執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒
        executor.setMaxPoolSize(300);
        // 緩衝佇列:用來緩衝執行任務的佇列
        executor.setQueueCapacity(600);
        // 允許執行緒的空閒時間60秒:當超過了核心執行緒之外的執行緒在空閒時間到達之後會被銷燬
        executor.setKeepAliveSeconds(60);
        // 執行緒池名的字首:設定好了之後可以方便我們定位處理任務所在的執行緒池
        executor.setThreadNamePrefix("YrSyncWrThread-");
        // 緩衝佇列滿了之後的拒絕策略:由呼叫執行緒處理(一般是主執行緒),這裡是DiscardPolicy丟棄策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

        executor.initialize();

        return executor;
    }


    /**
     * 配置實時無線同步綜資介面執行緒池
     *
     * @param
     * @return java.util.concurrent.Executor
     * @author linlianghong
     * @date 2020/11/13
     */
    @Bean("realYrSyncWrExecutor")
    public Executor realYrSyncWrExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 核心執行緒數:執行緒池建立時候初始化的執行緒數
        executor.setCorePoolSize(10);
        // 最大執行緒數:執行緒池最大的執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒
        executor.setMaxPoolSize(15);
        // 緩衝佇列:用來緩衝執行任務的佇列
        executor.setQueueCapacity(600);
        // 允許執行緒的空閒時間60秒:當超過了核心執行緒之外的執行緒在空閒時間到達之後會被銷燬
        executor.setKeepAliveSeconds(60);
        // 執行緒池名的字首:設定好了之後可以方便我們定位處理任務所在的執行緒池
        executor.setThreadNamePrefix("RealYrSyncWrThread-");
        // 緩衝佇列滿了之後的拒絕策略:由呼叫執行緒處理(一般是主執行緒),這裡是DiscardPolicy丟棄策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

        executor.initialize();

        return executor;
    }


    /**
     * 配置執行緒池
     * ThreadPoolTaskExecutor的處理流程:當池子大小小於corePoolSize,就新建執行緒,並處理請求
     * 當池子大小等於corePoolSize,把請求放入workQueue中,池子裡的空閒執行緒就去workQueue中取任務並處理
     * 當workQueue放不下任務時,就新建執行緒入池,並處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
     * 當池子的執行緒數大於corePoolSize時,多餘的執行緒會等待keepAliveTime長時間,如果無請求可處理就自行銷燬
     * 其會優先建立corePoolSize執行緒,當繼續增加執行緒時,先放入Queue中,當 CorePoolSiz  和 Queue 都滿的時候,就增加建立新執行緒,當執行緒達到MaxPoolSize的時候,就會丟擲錯
     * 另外MaxPoolSize的設定如果比系統支援的執行緒數還要大時,會丟擲java.lang.OutOfMemoryError: unable to create new native thread 異常。
     *
     * @author liangzhaolin
     * @date 2020/7/24 14:43
     * @see "https://blog.csdn.net/csdn_pangxiong/article/details/103731613"
     * @see "https://www.jianshu.com/p/14bde4e6f747"
     */
    @Bean("syncOneNetEleTypeExecutor")
    public Executor syncOneNetEleTypeExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 核心執行緒數:執行緒池建立時候初始化的執行緒數
        executor.setCorePoolSize(30);
        // 最大執行緒數:執行緒池最大的執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒
        executor.setMaxPoolSize(300);
        // 緩衝佇列:用來緩衝執行任務的佇列
        executor.setQueueCapacity(600);
        // 允許執行緒的空閒時間60秒:當超過了核心執行緒之外的執行緒在空閒時間到達之後會被銷燬
        executor.setKeepAliveSeconds(60);
        // 執行緒池名的字首:設定好了之後可以方便我們定位處理任務所在的執行緒池
        executor.setThreadNamePrefix("SyncOneNetEleTypeThread-");
        // 緩衝佇列滿了之後的拒絕策略:由呼叫執行緒處理(一般是主執行緒),這裡是DiscardPolicy丟棄策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

        executor.initialize();

        return executor;
    }
}