使用CompletionService結合ExecutorService批處理調用存儲過程任務實例
阿新 • • 發佈:2019-01-05
java多線 ror 查詢 fin lock cal mep work Education
此實例為java多線程並發調用存儲過程實例,只做代碼記載,不做詳細描述
1.線程池構造初始化類CommonExecutorService.java
package com.pupeiyuan.go; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger; public class CommonExecutorService { private static final int CAPACITY = 10000; private static final int CORE_POOL_SIZE = 100; private static final int MAXIMUM_POOL_SIZE = 1000; private static final Long KEEP_ALIVE_TIME = 100L; privateCommonExecutorService() { } public static ExecutorService getExecutorService() { return executorService; } /** * 構造請求線程池,隊列大小為1000 */ private static final ExecutorService executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(CAPACITY), new ThreadFactory() { AtomicInteger poolNumber = new AtomicInteger(1); AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { String namePrefix = String.format("reportPool-%s-thread-", poolNumber.getAndIncrement()); return new Thread(r, namePrefix + threadNumber.getAndIncrement()); } }); }
2.基礎任務類BaseTask並實現Callable
package com.pupeiyuan.go; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.logging.Logger; public abstract class BaseTask implements Callable<Map<String, Object>>{ // protected static final Logger logger = Logger.getLogger(BaseTask.class); /** * 組織ID */ private String orgId; /** * 組織類型 */ private String orgType; /** * 時間維度 */ private String timeDim; /** * 指標列表 */ private List<String> indexIdList; /** * 指標類別 */ private String indexCategory; /** * 指標權限 */ private Set<String> validIndexSet; /** * 存儲過程名稱 */ private String procName; /** * 後臺設置的有效指標集合 */ private Set<String> backendValidIndexSet; /** * 保存任務結果 */ private Map<String, Object> taskResultMap; /** * 記錄日誌用到的userId */ private String userId; public BaseTask() { // TODO Auto-generated constructor stub } @Override public Map<String, Object> call() { List<String> paramList = makeParamList(); ThreadUtils.set(getUserId()); List<Map<String, Object>> retListMap = callProc(paramList); ThreadUtils.remove(); Map<String, Object> retMap = handleBiResult(retListMap); this.setTaskResultMap(retMap); return retMap; } /** * 構造請求參數 * * @return */ public List<String> makeParamList() { List<String> params = new ArrayList<String>(); //uuid to old_id params.add(CommonConstant.getOldIdByUUID(getOrgId(),getOrgType())); params.add(ReportUtils.getBIOrgType(getOrgType())); params.add(getTimeDim()); if (getIndexIdList() != null && getIndexIdList().size() > 0) { params.add(StringUtils.join(getIndexIdList(), ",")); } if (StringUtils.isNotBlank(getIndexCategory())) { params.add(getIndexCategory()); } return params; } private List<Map<String, Object>> callProc(List<String> paramList) { System.out.println("thread:"+Thread.currentThread().getName()); CallProcedureIDao callProcedureIDao = SpringContextHolder.getBean(CallProcedureIDao.class); List<Map<String, Object>> retListMap = null; try { retListMap = callProcedureIDao.getCallProcedureResult(getProcName(), paramList); } catch(Exception e) { logger.error(e.getMessage(),e); } return retListMap; } public abstract Map<String, Object> handleBiResult(List<Map<String, Object>> retListMap); /** * @return the orgId */ public String getOrgId() { return orgId; } /** * @param orgId the orgId to set */ public void setOrgId(String orgId) { this.orgId = orgId; } /** * @return the orgType */ public String getOrgType() { return orgType; } /** * @param orgType the orgType to set */ public void setOrgType(String orgType) { this.orgType = orgType; } /** * @return the timeDim */ public String getTimeDim() { return timeDim; } /** * @param timeDim the timeDim to set */ public void setTimeDim(String timeDim) { this.timeDim = timeDim; } /** * @return the indexIdList */ public List<String> getIndexIdList() { return indexIdList; } /** * @param indexIdList the indexIdList to set */ public void setIndexIdList(List<String> indexIdList) { this.indexIdList = indexIdList; } /** * @return the validIndexSet */ public Set<String> getValidIndexSet() { return validIndexSet; } /** * @param validIndexSet the validIndexSet to set */ public void setValidIndexSet(Set<String> validIndexSet) { this.validIndexSet = validIndexSet; } /** * @return the procName */ public String getProcName() { return procName; } /** * @param procName the procName to set */ public void setProcName(String procName) { this.procName = procName; } /** * @return the indexCategory */ public String getIndexCategory() { return indexCategory; } /** * @param indexCategory the indexCategory to set */ public void setIndexCategory(String indexCategory) { this.indexCategory = indexCategory; } /** * @return the statCommonIService */ public StatCommonIService getStatCommonIService() { return SpringContextHolder.getBean(StatCommonIService.class); } /** * @return the sysContactsIService */ public SysContactsIService getSysContactsIService() { return SpringContextHolder.getBean(SysContactsIService.class); } /** * @return the backendValidIndexSet */ public Set<String> getBackendValidIndexSet() { return backendValidIndexSet; } /** * @param backendValidIndexSet the backendValidIndexSet to set */ public void setBackendValidIndexSet(Set<String> backendValidIndexSet) { this.backendValidIndexSet = backendValidIndexSet; } /** * @return the taskResultMap */ public Map<String, Object> getTaskResultMap() { return taskResultMap; } /** * @param taskResultMap the taskResultMap to set */ public void setTaskResultMap(Map<String, Object> taskResultMap) { this.taskResultMap = taskResultMap; } /** * @return the userId */ public String getUserId() { return userId; } /** * @param userId the userId to set */ public void setUserId(String userId) { this.userId = userId; } }
3.具體任務類繼承BaseTask繼承BaseTask
package com.pupeiyuan.go; public class PieTask extends BaseTask{ public PieTask() { // TODO Auto-generated constructor stub } public Map<String, Object>(List<Map<String, Object>> retListMap) { Map<String, Object> retMap = new HashMap<String, Object>(); if (retListMap != null && retListMap.size() > 0) { List<String> inputIdList = getIndexIdList(); Map<String, Object> map; String indexId; for(int i = 0; i < inputIdList.size(); i++) { indexId = inputIdList.get(i); map = retListMap.get(i); if (map == null) {//空數據處理 retMap.put(indexId,makePieItem(indexId,CommonConstant.INDEX_VALUE_STAR,getValidIndexSet())); } else { String indexValue = (String)map.get("INDEX_VALUE"); retMap.put(indexId,makePieItem(indexId,indexValue,getValidIndexSet())); } } } else {//空數據處理 retMap = makePieFakeData(getIndexIdList(),getValidIndexSet()); } return retMap; } //沒有數據處理 public static HrPieItem makePieItem(String indexId, String indexValue, Set<String> validIndexSet) { HrPieItem item = new HrPieItem(); IndexInfoVo indexObj = CommonConstant.getIndexObj(indexId); if (indexObj == null) { logger.error("ERROR:We dont find this indexId("+indexId+") info."); return item; } //查看權限 boolean bValid = validIndexSet.contains(indexId); bValid = true; item.setIndexId(indexId); String name = indexObj.getName(); String[] items = StringUtils.split(name, "\\|"); if (items != null ) { if (items.length == 1) { item.setIndexName(CommonConstant.getIndexNameWithDp(items[0],bValid)); } else if(items.length == 2) { item.setIndexName(CommonConstant.getIndexNameWithDp(items[0],bValid)); item.setIndexShortName(CommonConstant.getIndexNameWithDp(items[1],bValid)); } } item.setIndexUnit(indexObj.getUnit()); item.setIndexValue(CommonConstant.getIndexValueWithDp(indexValue,bValid)); return item; } //沒有數據處理 public static Map<String, Object> makePieFakeData(List<String> indexIdList, Set<String> validIndexSet) { Map<String, Object> retMap = new HashMap<String, Object>(); HrPieItem item; for (int i = 0 ;i <indexIdList.size(); i++) { String indexId = indexIdList.get(i); item = makePieItem(indexId,CommonConstant.INDEX_VALUE_STAR,validIndexSet); retMap.put(indexId, item); } return retMap; } //轉換扇區百分比 public static void convertPercentage(List<HrPieItem> pieItems) { Double sum = new Double(0); //計算各扇區總和 for (HrPieItem hrPieItem : pieItems) { if(null!=hrPieItem.getIndexValue()&&!"".equals(hrPieItem.getIndexValue())&&!"--".equals(hrPieItem.getIndexValue())) { sum+=Double.parseDouble(hrPieItem.getIndexValue()); } } if(sum>0) { //計算分區所占百分比 for (HrPieItem hrPieItem : pieItems) { if(null!=hrPieItem.getIndexValue()&&!"".equals(hrPieItem.getIndexValue())&&!"--".equals(hrPieItem.getIndexValue())) { double percentage = Double.parseDouble(hrPieItem.getIndexValue())/sum; percentage = (double) Math.round(percentage * 10000)/100; hrPieItem.setPercentage(String.valueOf(percentage)+"%"); } } } }
4.多線程處理觸發類TaskExecutor
package com.pupeiyuan.go; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class TaskExecutor { private static ExecutorService commonExecutorService = CommonExecutorService.getExecutorService(); private static ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<Map<String, Object>>(commonExecutorService); public TaskExecutor() { } public static void compute(List<BaseTask> taskList) { if (taskList != null && taskList.size() > 0) { for (BaseTask task : taskList) { completionService.submit(task); } try { for (int i = 0; i < taskList.size(); i++) { Future<Map<String, Object>> future = completionService.take(); future.get(5000, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } }
5.調用存儲過程實現類CallProcedureDaoImpl
package com.pupeiyuan.go; public class CallProcedureDaoImpl implements CallProcedureIDao { protected final Log logger = LogFactory.getLog(this.getClass()); private JdbcTemplate biJdbcTemplate; private JdbcTemplate jdbcTemplate; /** * * @param procedureName 存儲過程名稱包含預編譯參數 {call testpro(?,?)} 最後一個參數為輸出參數為遊標類型 * @param params 對應存儲過程執行參數是有順序的,參數類型為字符類型 不包含存儲的輸出參數 * @return */ @Override @SuppressWarnings("unchecked") public List<Map<String,Object>> getCallProcedureResult(final String procedureName, final List<String> params) { final String userId = (String) ThreadUtils.get(); final long startTime = System.currentTimeMillis(); //logger.info("開始調用存儲過程【"+procedureName+"】,入參【" + JSON.toJSONString(params) + "】"); List<Map<String,Object>> resultList = null; try { resultList = (List<Map<String,Object>>) biJdbcTemplate.execute( new CallableStatementCreator() { public CallableStatement createCallableStatement(Connection con) throws SQLException { String storedProc = procedureName;// 調用的sql CallableStatement cs = con.prepareCall(storedProc); for (int i=0; i< params.size();i++) { cs.setString((i+1), params.get(i));// 設置輸入參數的值 } cs.registerOutParameter((params.size()+1),OracleTypes.CURSOR);// 註冊輸出參數的類型 return cs; } }, new CallableStatementCallback() { public Object doInCallableStatement(CallableStatement cs) throws SQLException,DataAccessException { List<Map<String,Object>> resultsMap = new ArrayList<Map<String,Object>>(); cs.execute(); ResultSet rs = (ResultSet) cs.getObject((params.size()+1));;// 此處值必須跟遊標返回的值下標是統一個下標 if (rs!=null) { ResultSetMetaData rsmd = rs.getMetaData(); List<String> columNames = new ArrayList<String>(); for(int i=1; i<= rsmd.getColumnCount(); i++){ columNames.add(rsmd.getColumnName(i)); //將字段名放在List中 } if (!CollectionUtils.isEmpty(columNames)) { while (rs.next()) {// 轉換每行的返回值到Map中 Map<String,Object> rowMap = new HashMap<String,Object>(); for (String columName : columNames) { rowMap.put(columName, rs.getObject(columName)); } resultsMap.add(rowMap); } } rs.close(); } return resultsMap; } }); final long endTime = System.currentTimeMillis(); // logger.info("結束調用存儲過程【"+procedureName+"】,入參【"+ JSON.toJSONString(params) + "】,查詢存儲過程返回數據條數【"+resultList.size()+"】總耗時:" + (endTime-startTime) + "毫秒"); // logger.info("本次調用存儲過程返回數據:"+JSON.toJSONString(resultList)); final List<Map<String, Object>> finalResultList = resultList; jdbcTemplate.update("INSERT INTO PROCEDURE_LOGS VALUES(?,?,?,?,?,?,?,?)", new PreparedStatementSetter() { public void setValues(PreparedStatement ps) throws SQLException { ps.setString(1, JugHelper.generalUUID()); ps.setString(2, procedureName); ps.setString(3, JSON.toJSONString(params)); ps.setString(4, JSON.toJSONString(finalResultList)); ps.setTimestamp(5, new Timestamp(new Date().getTime())); ps.setInt(6, Integer.valueOf((endTime-startTime)+"")); ps.setString(7, "1");// 正常 ps.setString(8, AppStringUtils.isNotEmpty(userId) ? userId : "");// 用戶ID } } ); } catch (Exception e) { final long endTime = System.currentTimeMillis(); final String errorMsg = getStackTrace(e); jdbcTemplate.update("INSERT INTO PROCEDURE_LOGS VALUES(?,?,?,?,?,?,?,?)", new PreparedStatementSetter() { public void setValues(PreparedStatement ps) throws SQLException { ps.setString(1, JugHelper.generalUUID()); ps.setString(2, procedureName); ps.setString(3, JSON.toJSONString(params)); ps.setString(4, errorMsg); ps.setTimestamp(5, new Timestamp(new Date().getTime())); ps.setInt(6, Integer.valueOf((endTime-startTime)+"")); ps.setString(7, "0");// 異常 ps.setString(8, AppStringUtils.isNotEmpty(userId) ? userId : "");// 用戶ID } } ); } return resultList; } /** * 獲取完整的異常堆棧信息 * @param throwable * @return */ protected String getStackTrace(Throwable throwable) { StringWriter sw = null; PrintWriter pw = null; try { sw = new StringWriter(); pw = new PrintWriter(sw, true); throwable.printStackTrace(pw); return sw.getBuffer().toString(); } catch (Exception e) { e.printStackTrace(); return e.getMessage(); } finally { if (sw!=null) { try { sw.close(); } catch (IOException e) { e.printStackTrace(); } } if (pw!=null) { pw.close(); } } } public void setBiJdbcTemplate(JdbcTemplate biJdbcTemplate) { this.biJdbcTemplate = biJdbcTemplate; } public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; }
6.工具類ThreadUtils
package com.pupeiyuan.go; public class ThreadUtils { /*ThreadLocal:將變量與當前線程綁定*/ private static ThreadLocal<Object> threadLocal = new ThreadLocal<Object>(); public static void set(Object value) { threadLocal.set(value); } public static Object get() { return threadLocal.get(); } public static void remove() { threadLocal.remove(); } }
7.業務類
package com.pupeiyuan.go; public class test { //取出PIE的指標 List<String> pieWorkYearIndexList = HrIndexConstant.getPieWorkYearIndexList(); List<String> pieEducationIndexList = HrIndexConstant.getPieEducationIndexList(); List<String> pieCapacityIndexList = HrIndexConstant.getPieCapacityIndexList(); List<String> pieConsultorStarIndexList = HrIndexConstant.getPieConsultorStarIndexList(); List<String> pieRecruitChannelIndexList = HrIndexConstant.getPieRecruitChannelIndexList(); List<String> allIndex = new ArrayList<String>(); allIndex.addAll(pieWorkYearIndexList); allIndex.addAll(pieCapacityIndexList); allIndex.addAll(pieEducationIndexList); allIndex.addAll(pieConsultorStarIndexList); allIndex.addAll(pieRecruitChannelIndexList); //獲取指標權限 Set<String> validIndexSet = CommonConstant.getValidIndexList(orgId, postId, roleId, allIndex); validIndexSet = HrIndexConstant.handleHrInnerPriv(validIndexSet,postId); // PieTask workYearTask = new PieTask(); workYearTask.setOrgId(orgId); workYearTask.setOrgType(orgType); workYearTask.setTimeDim(timeDim); workYearTask.setIndexIdList(pieWorkYearIndexList); workYearTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE")); workYearTask.setValidIndexSet(validIndexSet); workYearTask.setUserId(userId); PieTask educationTask = new PieTask(); educationTask.setOrgId(orgId); educationTask.setOrgType(orgType); educationTask.setTimeDim(timeDim); educationTask.setIndexIdList(pieEducationIndexList); educationTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE")); educationTask.setValidIndexSet(validIndexSet); educationTask.setUserId(userId); PieTask consultorStarTask = new PieTask(); consultorStarTask.setOrgId(orgId); consultorStarTask.setOrgType(orgType); consultorStarTask.setTimeDim(timeDim); consultorStarTask.setIndexIdList(pieConsultorStarIndexList); consultorStarTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE")); consultorStarTask.setValidIndexSet(validIndexSet); consultorStarTask.setUserId(userId); PieTask recruitChannelTask = new PieTask(); recruitChannelTask.setOrgId(orgId); recruitChannelTask.setOrgType(orgType); recruitChannelTask.setTimeDim(timeDim); recruitChannelTask.setIndexIdList(pieRecruitChannelIndexList); recruitChannelTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE")); recruitChannelTask.setValidIndexSet(validIndexSet); recruitChannelTask.setUserId(userId); /*CapacityPieTask capacityTask = new CapacityPieTask(); capacityTask.setOrgId(orgId); capacityTask.setOrgType(orgType); capacityTask.setTimeDim(timeDim); capacityTask.setIndexIdList(pieCapacityIndexList); capacityTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE")); capacityTask.setValidIndexSet(validIndexSet); capacityTask.setUserId(userId); */ List<BaseTask> taskList = new ArrayList<BaseTask>(5); taskList.add(workYearTask); //taskList.add(capacityTask); taskList.add(educationTask); taskList.add(consultorStarTask); taskList.add(recruitChannelTask); TaskExecutor.compute(taskList); //後續處理 List<HrPieData> pieList = new ArrayList<HrPieData>(); HrPieData pieData = new HrPieData(); pieData.setPieName("司齡分布"); pieData.setSubIndexList(getPieItemResult(pieWorkYearIndexList, workYearTask.getTaskResultMap())); workYearTask.convertPercentage(pieData.getSubIndexList()); pieList.add(pieData); pieData = new HrPieData(); pieData.setPieName("學歷分布"); pieData.setSubIndexList(getPieItemResult(pieEducationIndexList, educationTask.getTaskResultMap())); educationTask.convertPercentage(pieData.getSubIndexList()); pieList.add(pieData); pieData = new HrPieData(); pieData.setPieName("經紀人星級分布"); pieData.setSubIndexList(getPieItemResult(pieConsultorStarIndexList, consultorStarTask.getTaskResultMap())); consultorStarTask.convertPercentage(pieData.getSubIndexList()); pieList.add(pieData); pieData = new HrPieData(); pieData.setPieName("招聘渠道分布"); pieData.setSubIndexList(getPieItemResult(pieRecruitChannelIndexList, recruitChannelTask.getTaskResultMap())); recruitChannelTask.convertPercentage(pieData.getSubIndexList()); pieList.add(pieData); }
使用CompletionService結合ExecutorService批處理調用存儲過程任務實例