非同步併發利器:實際專案中使用CompletionService提升系統性能的一次實踐
場景
隨著網際網路應用的深入,很多傳統行業也都需要接入到網際網路。我們公司也是這樣,保險核心需要和很多保險中介對接,比如阿里、京東等等。這些公司對於介面服務的效能有些比較高的要求,傳統的核心無法滿足要求,所以資訊科技部領導高瞻遠矚,決定開發網際網路接入服務,滿足來自效能的需求。
概念
CompletionService將Executor和BlockingQueue的功能融合在一起,將Callable任務提交給CompletionService來執行,然後使用類似於佇列操作的take和poll等方法來獲得已完成的結果,而這些結果會在完成時被封裝為Future。對於更多的概念,請參閱其他網路文件。
執行緒池的設計,阿里開發手冊說過不要使用Java Executors 提供的預設執行緒池,因此需要更接近實際的情況來自定義一個執行緒池,根據多次壓測,採用的執行緒池如下:
public ExecutorService getThreadPool(){
return new ThreadPoolExecutor(75,
125,
180000,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(450),
new ThreadPoolExecutor.CallerRunsPolicy());
}
說明:公司的業務為低頻交易,對於單次呼叫效能要求高,但是併發壓力根本不大,所以 阻塞佇列已滿且執行緒數達到最大值時所採取的飽和策略為呼叫者執行。
實現
業務
投保業務主要涉及這幾個大的方面:投保校驗、核保校驗、保費試算
-
投保校驗:最主要的是要查詢客戶黑名單和風險等級,都是千萬級的表。而且投保人和被保人都需要校驗
-
核保校驗:除了常規的核保規則校驗,查詢千萬級的大表,還需要呼叫外部智慧核保介面獲得使用者的風險等級,投保人和被保人都需要校驗
-
保費試算:需要計算每個險種的保費
設計
根據上面的業務,如果序列執行的話,單次效能肯定不高,所以考慮多執行緒非同步執行獲得校驗結果,再對結果綜合判斷
-
投保校驗:採用一個執行緒(也可以根據投保人和被保人數量來採用幾個執行緒)
-
核保校驗:
-
常規校驗:採用一個執行緒
-
外部呼叫:有幾個使用者(指投保人和被保人)就採用幾個執行緒
保費計算:有幾個險種就採用幾個執行緒,最後合併得到整個的保費
程式碼
以下程式碼是樣例,實際邏輯已經去掉
先建立投保、核保(常規、外部呼叫)、保費計算4個業務服務類:
投保服務類:InsuranceVerificationServiceImpl,假設耗時50ms
@Service
public class InsuranceVerificationServiceImpl implements InsuranceVerificationService {
private static final Logger logger = LoggerFactory.getLogger(InsuranceVerificationServiceImpl.class);
@Override
public TaskResponseModel<Object> insuranceCheck(String key, PolicyModel policyModel) {
try {
//假設耗時50ms
Thread.sleep(50);
return TaskResponseModel.success().setKey(key).setData(policyModel);
} catch (InterruptedException e) {
logger.warn(e.getMessage());
return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
}
}
}
核保常規校驗服務類:UnderwritingCheckServiceImpl,假設耗時50ms
@Service
public class UnderwritingCheckServiceImpl implements UnderwritingCheckService {
private static final Logger logger = LoggerFactory.getLogger(UnderwritingCheckServiceImpl.class);
@Override
public TaskResponseModel<Object> underwritingCheck(String key, PolicyModel policyModel) {
try {
//假設耗時50ms
Thread.sleep(50);
return TaskResponseModel.success().setKey(key).setData(policyModel);
} catch (InterruptedException e) {
logger.warn(e.getMessage());
return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
}
}
}
核保外部呼叫服務類:ExternalCallServiceImpl,假設耗時200ms
@Service
public class ExternalCallServiceImpl implements ExternalCallService {
private static final Logger logger = LoggerFactory.getLogger(ExternalCallServiceImpl.class);
@Override
public TaskResponseModel<Object> externalCall(String key, Insured insured) {
try {
//假設耗時200ms
Thread.sleep(200);
ExternalCallResultModel externalCallResultModel = new ExternalCallResultModel();
externalCallResultModel.setIdcard(insured.getIdcard());
externalCallResultModel.setScore(200);
return TaskResponseModel.success().setKey(key).setData(externalCallResultModel);
} catch (InterruptedException e) {
logger.warn(e.getMessage());
return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
}
}
}
試算服務類:TrialCalculationServiceImpl,假設耗時50ms
@Service
public class TrialCalculationServiceImpl implements TrialCalculationService {
private static final Logger logger = LoggerFactory.getLogger(TrialCalculationServiceImpl.class);
@Override
public TaskResponseModel<Object> trialCalc(String key, Risk risk) {
try {
//假設耗時50ms
Thread.sleep(50);
return TaskResponseModel.success().setKey(key).setData(risk);
} catch (InterruptedException e) {
logger.warn(e.getMessage());
return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
}
}
}
統一返回介面類:TaskResponseModel, 上面4個服務的方法統一返回TaskResponseModel
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@Accessors(chain = true)
public class TaskResponseModel<T extends Object> implements Serializable {
private String key; //唯一呼叫標誌
private String resultCode; //結果碼
private String resultMessage; //結果資訊
private T data; //業務處理結果
public static TaskResponseModel<Object> success() {
TaskResponseModel<Object> taskResponseModel = new TaskResponseModel<>();
taskResponseModel.setResultCode("200");
return taskResponseModel;
}
public static TaskResponseModel<Object> failure() {
TaskResponseModel<Object> taskResponseModel = new TaskResponseModel<>();
taskResponseModel.setResultCode("400");
return taskResponseModel;
}
}
注:
-
key為這次呼叫的唯一標識,由呼叫者傳進來
-
resultCode結果碼,200為成功,400表示有異常
-
resultMessage資訊,表示不成功或者異常資訊
-
data業務處理結果,如果成功的話
-
這些服務類都是單例模式
要使用用CompletionService的話,需要建立實現了Callable介面的執行緒
投保Callable:
@Data
@AllArgsConstructor
public class InsuranceVerificationCommand implements Callable<TaskResponseModel<Object>> {
private String key;
private PolicyModel policyModel;
private final InsuranceVerificationService insuranceVerificationService;
@Override
public TaskResponseModel<Object> call() throws Exception {
return insuranceVerificationService.insuranceCheck(key, policyModel);
}
}
核保常規校驗Callable:
@Data
@AllArgsConstructor
public class UnderwritingCheckCommand implements Callable<TaskResponseModel<Object>> {
private String key;
private PolicyModel policyModel;
private final UnderwritingCheckService underwritingCheckService;
@Override
public TaskResponseModel<Object> call() throws Exception {
return underwritingCheckService.underwritingCheck(key, policyModel);
}
}
核保外部呼叫Callable:
@Data
@AllArgsConstructor
public class ExternalCallCommand implements Callable<TaskResponseModel<Object>> {
private String key;
private Insured insured;
private final ExternalCallService externalCallService;
@Override
public TaskResponseModel<Object> call() throws Exception {
return externalCallService.externalCall(key, insured);
}
}
試算呼叫Callable:
@Data
@AllArgsConstructor
public class TrialCalculationCommand implements Callable<TaskResponseModel<Object>> {
private String key;
private Risk risk;
private final TrialCalculationService trialCalculationService;
@Override
public TaskResponseModel<Object> call() throws Exception {
return trialCalculationService.trialCalc(key, risk);
}
}
注:
-
每一次呼叫,需要建立這4種Callable
-
返回統一介面TaskResopnseModel
非同步執行的類:TaskExecutor
@Component
public class TaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);
//執行緒池
private final ExecutorService executorService;
public TaskExecutor(ExecutorService executorService) {
this.executorService = executorService;
}
//非同步執行,獲取所有結果後返回
public List<TaskResponseModel<Object>> execute(List<Callable<TaskResponseModel<Object>>> commands) {
//建立非同步執行物件
CompletionService<TaskResponseModel<Object>> completionService = new ExecutorCompletionService<>(executorService);
for (Callable<TaskResponseModel<Object>> command : commands) {
completionService.submit(command);
}
//獲取所有非同步執行執行緒的結果
int taskCount = commands.size();
List<TaskResponseModel<Object>> params = new ArrayList<>(taskCount);
try {
for (int i = 0; i < taskCount; i++) {
Future<TaskResponseModel<Object>> future = completionService.take();
params.add(future.get());
}
} catch (InterruptedException | ExecutionException e) {
//異常處理
params.clear();
params.add(TaskResponseModel.failure().setKey("error").setResultMessage("非同步執行執行緒錯誤"));
}
//返回,如果執行中發生error, 則返回相應的key值:error
return params;
}
}
注:
-
為單例模式
-
接收引數為List<Callable<TaskResponseModel<Object>>>,也就是上面定義的4種Callable的列表
-
返回List<TaskResponseModel<Object>>,也就是上面定義4種Callable返回的結果列表
-
我們的業務是對返回結果統一判斷,業務返回結果有因果關係
-
如果執行緒執行有異常,也返回List<TaskResponseModel>,這個時候列表中只有一個TaskResponseModel,key為error, 後續呼叫者可以通過這個來判斷執行緒是否執行成功;
呼叫方:CompletionServiceController
@RestController
public class CompletionServiceController {
//投保key
private static final String INSURANCE_KEY = "insurance_";
//核保key
private static final String UNDERWRITING_KEY = "underwriting_";
//外部呼叫key
private static final String EXTERNALCALL_KEY = "externalcall_";
//試算key
private static final String TRIA_KEY = "trial_";
private static final Logger logger = LoggerFactory.getLogger(CompletionServiceController.class);
private final ExternalCallService externalCallService;
private final InsuranceVerificationService insuranceVerificationService;
private final TrialCalculationService trialCalculationService;
private final UnderwritingCheckService underwritingCheckService;
private final TaskExecutor taskExecutor;
public CompletionServiceController(ExternalCallService externalCallService, InsuranceVerificationService insuranceVerificationService, TrialCalculationService trialCalculationService, UnderwritingCheckService underwritingCheckService, TaskExecutor taskExecutor) {
this.externalCallService = externalCallService;
this.insuranceVerificationService = insuranceVerificationService;
this.trialCalculationService = trialCalculationService;
this.underwritingCheckService = underwritingCheckService;
this.taskExecutor = taskExecutor;
}
//多執行緒非同步併發介面
@PostMapping(value = "/async", headers = "Content-Type=application/json;charset=UTF-8")
public String asyncExec(@RequestBody PolicyModel policyModel) {
long start = System.currentTimeMillis();
asyncExecute(policyModel);
logger.info("非同步總共耗時:" + (System.currentTimeMillis() - start));
return "ok";
}
//序列呼叫介面
@PostMapping(value = "/sync", headers = "Content-Type=application/json;charset=UTF-8")
public String syncExec(@RequestBody PolicyModel policyModel) {
long start = System.currentTimeMillis();
syncExecute(policyModel);
logger.info("同步總共耗時:" + (System.currentTimeMillis() - start));
return "ok";
}
private void asyncExecute(PolicyModel policyModel) {
List<Callable<TaskResponseModel<Object>>> baseTaskCallbackList = new ArrayList<>();
//根據被保人外部介面呼叫
for (Insured insured : policyModel.getInsuredList()) {
ExternalCallCommand externalCallCommand = new ExternalCallCommand(EXTERNALCALL_KEY + insured.getIdcard(), insured, externalCallService);
baseTaskCallbackList.add(externalCallCommand);
}
//投保校驗
InsuranceVerificationCommand insuranceVerificationCommand = new InsuranceVerificationCommand(INSURANCE_KEY, policyModel, insuranceVerificationService);
baseTaskCallbackList.add(insuranceVerificationCommand);
//核保校驗
UnderwritingCheckCommand underwritingCheckCommand = new UnderwritingCheckCommand(UNDERWRITING_KEY, policyModel, underwritingCheckService);
baseTaskCallbackList.add(underwritingCheckCommand);
//根據險種進行保費試算
for(Risk risk : policyModel.getRiskList()) {
TrialCalculationCommand trialCalculationCommand = new TrialCalculationCommand(TRIA_KEY + risk.getRiskcode(), risk, trialCalculationService);
baseTaskCallbackList.add(trialCalculationCommand);
}
List<TaskResponseModel<Object>> results = taskExecutor.execute(baseTaskCallbackList);
for (TaskResponseModel<Object> t : results) {
if (t.getKey().equals("error")) {
logger.warn("執行緒執行失敗");
logger.warn(t.toString());
}
logger.info(t.toString());
}
}
private void syncExecute(PolicyModel policyModel) {
//根據被保人外部介面呼叫
for (Insured insured : policyModel.getInsuredList()) {
TaskResponseModel<Object> externalCall = externalCallService.externalCall(insured.getIdcard(), insured);
logger.info(externalCall.toString());
}
//投保校驗
TaskResponseModel<Object> insurance = insuranceVerificationService.insuranceCheck(INSURANCE_KEY, policyModel);
logger.info(insurance.toString());
//核保校驗
TaskResponseModel<Object> underwriting = underwritingCheckService.underwritingCheck(UNDERWRITING_KEY, policyModel);
logger.info(underwriting.toString());
//根據險種進行保費試算
for(Risk risk : policyModel.getRiskList()) {
TaskResponseModel<Object> risktrial = trialCalculationService.trialCalc(risk.getRiskcode(), risk);
logger.info(risktrial.toString());
}
}
}
注:
1.為測試方便,提供兩個介面呼叫:一個是序列執行,一個是非同步併發執行
2.在非同步併發執行函式asyncExecute中:
-
根據有多少個被保人,建立多少個外部呼叫的Callable例項,key值為EXTERNALCALL_KEY + insured.getIdcard(),在一次保單投保呼叫中,每一個被保人Callable的key是不一樣的。
-
根據有多少個險種,建立多少個試算的Callable例項,key為TRIA_KEY + risk.getRiskcode(),在一次保單投保呼叫中,每一個險種的Callable的key是不一樣的
-
建立投保校驗的Callable例項,業務上只需要一個
-
建立核保校驗的Callable例項,業務上只需要一個
-
將Callable列表傳入到TaskExecutor執行非同步併發呼叫
-
根據返回結果來判斷,通過判斷返回的TaskResponseModel的key值可以知道是哪類業務校驗,分別進行判斷,還可以交叉判斷(公司的業務就是要交叉判斷)
驗證
驗證資料:
{
"insuredList":
[{"idcard":"laza","name":"320106"},
{"idcard":"ranran","name":"120102"}],
"policyHolder":"lazasha","policyNo":"345000987","riskList":
[{"mainFlag":1,"premium":300,"riskcode":"risk001","riskname":"險種一"},
{"mainFlag":0,"premium":400,"riskcode":"risk002","riskname":"險種二"}]
}
上面資料表明:有兩個被保人,兩個險種。按照我們上面的定義,會呼叫兩次外部介面,兩次試算,一次投保,一次核保。而在樣例程式碼中,一次外部介面呼叫耗時為200ms, 其他都為50ms.
本地開發的配置為8C16G:
-
同步序列介面呼叫計算:2 * 200 + 2 * 50 + 50 + 50 = 600ms
-
多執行緒非同步執行呼叫計算:按照多執行緒併發執行原理,取耗時最長的200ms
驗證:同步介面
輸出耗時:可以看到耗時601ms
驗證:多執行緒非同步執行介面
輸出耗時:可以看到為204ms
結果:基本和我們的預期相符合。
結束
這是將實際生產中的例子簡化出來,具體生產的業務比較複雜,不便於展示。
實際情況下,原來的介面需要1000ms以上才能完成單次呼叫,有的需要2000-3000ms。現在的介面,在生產兩臺8c16g的虛擬機器, 經過4個小時的簡單壓測能夠支援2000使用者併發,單次返回時長為350ms左右,服務很穩定,完全能夠滿足公司的業務發展需求。
提供的這個是可以執行的列子,程式碼在:https://github.com/lazasha111211/completionservice-demo.git