多執行緒實現Callable資料查詢
阿新 • • 發佈:2019-01-10
當在做es查詢時,需要通過terms進行查詢,terms一次最多可以進行1000個值的查詢,如果通過迭代去查詢,單執行緒查詢會比較耗時間,因此,採用分批,每一個批次放入一個執行緒,通過設定固定執行緒池去進行執行緒的呼叫,防止terms太多,分批次多造成執行緒數過多。經過這樣,可以有效的提升查詢效能。其他相似場景亦可。
定義執行緒類:
/** * @Author: MR LIS * @Description:terms多執行緒查詢實現 * @Date: Create in 17:05 2018/7/3 * @Modified By: */ @Service(value = "multiTermsESQueryThread") @Scope(value="prototype") public class MultiTermsESQueryThread implements Callable<List<Map<String, Object>>> { private List<String> indexs; private List<String> types; private String proName; private List<Object> proValueList; @Override public List<Map<String, Object>> call() throws Exception { TransportClient client = ESUtils.getClient(); SearchRequestBuilder searchRequestBuilder = client.prepareSearch(); String[] indexNew = DataOperateUtils.tranListToArray(indexs); searchRequestBuilder.setIndices(indexNew); //一次性查詢幾個index 進行搜尋 大類情況下的搜尋 if (types != null && !types.isEmpty()) { searchRequestBuilder.setTypes(types.toArray(new String[0])); } List<Map<String, Object>> datas = new ArrayList<>(); searchRequestBuilder.setQuery(QueryBuilders.termsQuery(proName, proValueList)); searchRequestBuilder.setSize(10000);//一次最多可以返回的記錄數 SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); SearchHits hits = searchResponse.getHits(); List<Long> nodeIdShipIdList = new ArrayList<>(); for (SearchHit searchHitFields : hits.getHits()) { Map<String, Object> sourceAsMap = searchHitFields.getSourceAsMap(); DataOperateUtils.expendProperty(searchHitFields, sourceAsMap); boolean isNode = sourceAsMap.containsKey(StaticParameterUtils.ES_NODE_ID_NAME); //檢視是否是本體 if (isNode) { // 本體 Long nodeId = new Long(sourceAsMap.get(StaticParameterUtils.ES_NODE_ID_NAME).toString()); if(!nodeIdShipIdList.contains(nodeId)){ nodeIdShipIdList.add(nodeId); }else{ continue; } } else { //關係 Long relId = new Long(sourceAsMap.get(StaticParameterUtils.ES_RELATIONSHIP_ID_NAME).toString()); if(!nodeIdShipIdList.contains(relId)){ nodeIdShipIdList.add(relId); }else{ continue; } } datas.add(sourceAsMap); } nodeIdShipIdList.clear(); nodeIdShipIdList=null; return datas; } public void setIndexs(List<String> indexs) { this.indexs = indexs; } public void setTypes(List<String> types) { this.types = types; } public void setProName(String proName) { this.proName = proName; } public void setProValueList(List<Object> proValueList) { this.proValueList = proValueList; } }
call()返回指定的型別,如:List<Map<String, Object>>
進行方法定義,分批次,每一個批次都通過一個執行緒去執行:
@Override public List<Map<String, Object>> termsDataByProMulti(List<String> indexs, String type, String proName, List<Object> proValueList) { List<Map<String, Object>> datas = new ArrayList<>(); //定義執行緒池 ExecutorService threadPool = Executors.newFixedThreadPool(5); try { List<String> types = new ArrayList<>(); if (StringUtils.isNotBlank(type)) { types.add(type); } int loopNum = (int) Math.ceil((float) proValueList.size() / 1000); List<Object> subProValueList = null; List<Callable<List<Map<String, Object>>>> tasks = new ArrayList<>(); for (int i = 0; i < loopNum; i++) { int fromIndex = i * 1000; int toIndex = 0; if (i == (loopNum - 1)) { toIndex = proValueList.size(); } else { toIndex = (i + 1) * 1000; } subProValueList = proValueList.subList(fromIndex, toIndex); MultiTermsESQueryThread termsESQueryThread = (MultiTermsESQueryThread) SpringUtils.getBean("multiTermsESQueryThread"); termsESQueryThread.setIndexs(indexs); termsESQueryThread.setProName(proName); termsESQueryThread.setTypes(types); termsESQueryThread.setProValueList(subProValueList); tasks.add(termsESQueryThread); } List<Future<List<Map<String, Object>>>> futures = threadPool.invokeAll(tasks); for (Future<List<Map<String, Object>>> future : futures) { List<Map<String, Object>> mapList = future.get(); if (mapList != null) { datas.addAll(mapList); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { // 關閉執行緒池 threadPool.shutdown(); } return datas; }