批量介面多執行緒併發執行
阿新 • • 發佈:2019-01-31
開發中可能會有這樣的場景,一個功能需要同時完成不相關的多個操作,最後針對多個操作結果統一處理。比如一個查詢功能內部需要同時查詢A、B、C、D四個介面,彙總所有介面查詢內容後返回,一般來說可以逐個查詢彙總即可。
現在問題來了,如介面超時時間限制為3秒,且A、B、C、D四個介面每個介面RT=1,每一個介面都有RT消耗,則同時完成四個介面查詢併合並返回的時間必然大於4秒,導致總查詢功能介面超時。一般大家會想到使用Future,即希望通過多執行緒併發執行解決該問題。查詢執行開始,啟動四個執行緒同時執行A、B、C、D四個介面查詢,最後合併多個執行緒查詢結果即可。開發過程中一般結合線程池配合,以保證更好的執行緒管理。
簡單示例如下<僅供參考>:
1、執行任務執行緒
import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; /** * 服務提供描述 */ public class ServiceInvoker implements Callable<List<InsurePolicy>> { private InsureQueryService insureQueryService;//介面 private String userId;//引數 private String desc;//介面描述,方便介面異常統計 public ServiceInvoker() { } /** * 執行緒任務構造類 */ public ServiceInvoker(String userId, String method, String desc, InsureQueryService insureQueryService) { this.userId = userId; this.desc = desc; this.insureQueryService = insureQueryService; } @Override public List<InsurePolicy> call() throws Exception { try{ long start = System.currentTimeMillis(); InsureResult<List<InsurePolicy>> insureResult = insureQueryService.queryPolicyList("***", userId); long end = System.currentTimeMillis(); LoggerUtil.rtLog("UserId[" + userId + "],Interface[" + desc + "],耗時: ", end - start); return insureResult.getResult(); }catch(Exception e){ LoggerUtil.log("ServiceInvoker_queryPolicyList_callback error:userId=" + userId, e); } return new ArrayList<InsurePolicy>(); } }
2、執行緒池-批任務併發執行
import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import javax.annotation.Resource; /** * 多執行緒批量請求介面 */ public class BatchInvokerServiceImpl implements BatchInvokerService { /** * 呼叫的執行緒池大小 */ private int poolSize = 60; private int timeOut = 3000; private ExecutorService pool = Executors.newFixedThreadPool(poolSize); @Resource private InsureQueryService insureQueryService1; @Resource private InsureQueryService insureQueryService2; private List<InsurePolicy> invokeQueryPolicy(List<ServiceInvoker> invokerList) { List<InsurePolicy> results = new ArrayList<InsurePolicy>(); try { int index = 0; List<Future<List<InsurePolicy>>> futures = pool.invokeAll(invokerList, timeOut, TimeUnit.MILLISECONDS); for (Future<List<InsurePolicy>> future : futures) { try { if (future != null) { if (future.isCancelled()) { LoggerUtil.log("insure_invokeHSFQueryPolicy_fail:[" + InterfaceEnum.getByIndex(index) + "]Future已超時被終止"); continue; } List<InsurePolicy> resultList = future.get(); if (null != resultList && !resultList.isEmpty()) { results.addAll(resultList); } } } catch (Exception e) { LoggerUtil.log("insure_invokeHSFQueryPolicy_error:[" + InterfaceEnum.getByIndex(index) + "]Feture取值異常", e); } } } catch (Exception e) { LoggerUtil.log("insure_invokeHSFQueryPolicy_error:執行緒池異常", e); } return results; } }
3、對外查詢介面定義
/**
* 執行批處理執行緒
*/
public List<InsurePolicy> queryPolicyList(String userId) {
List<ServiceInvoker> invokers = new ArrayList<ServiceInvoker>();
invokers.add(new ServiceInvoker(userId, "A-DB查詢", insureQueryService1));
invokers.add(new ServiceInvoker(userId, "B-DB查詢", insureQueryService2));
return invokeQueryPolicy(invokers);
}
應用中可以做更多的優化和擴充套件,支援不同型別介面、引數併發執行,以實現工具通用化。