1. 程式人生 > >Jest客戶端索引新增、資料查詢、分頁例項

Jest客戶端索引新增、資料查詢、分頁例項


/**
 * 〈一句話功能簡述〉操作ES的Jest客戶端<br>
 * 〈功能詳細描述〉
 *
 * @author wangzha
 * @see [相關類/方法](可選)
 * @since [產品/模組版本] (可選)
 */
@Service
public class JestService<T> implements InitializingBean {

    public static Logger logger = LoggerFactory.getLogger(JestService.class);

    /**
     * jest客戶端,單例
     */
private static JestClient client = null; @Value("${es.cluster}") private String cluster; /** * es 讀取超時時間 */ private static final String ES_READ_TIMEOUT = "es.read.timeout"; /** * es 連線超時時間 */ private static final String ES_CONNECT_TIMEOUT = "es.connect.timeout"
; /** * 滾動ID key */ private static final String SCORLL_ID_KEY = "_scroll_id"; /** * 命中集 key */ private static final String QUERY_HITS_KEY = "hits"; /** * 資料來源 key */ private static final String SOURCE_KEY = "_source"; /** * 搜尋上下文的時間,用來支援該批次 */
private static final String SCROLL_ALIVE_TIME = "5m"; @Override public void afterPropertiesSet() throws Exception { init(); } /** * 客戶端初始化 */ public void init() { logger.info("ESJestClient Init Start..."); synchronized (JestService.class) { if (client != null) return; // String[] uriArr = cluster.split(Symbols.COMMA); String[] uriArr = new String[]{"http://10.37.2.142:9900", "http://10.37.2.143:9900", "http://10.37.2.144:9900"}; if (uriArr.length > 0) { Set<String> serverUris = new LinkedHashSet<String>(); for (int i = 0; i < uriArr.length; i++) { if (StringUtils.isNotEmpty(uriArr[i])) serverUris.add(uriArr[i]); } JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(serverUris) // .connTimeout(SCMServiceCfg.getNodeTransInt(ES_CONNECT_TIMEOUT)) // .readTimeout(SCMServiceCfg.getNodeTransInt(ES_READ_TIMEOUT)) .connTimeout(1000) .readTimeout(10000) .multiThreaded(true) .build()); client = factory.getObject(); logger.info(String.format("ESJestClient init success,ip:[%s]", uriArr)); } else { throw new AppException("ESJestClient init error,uri is null,Check scm ! ", ResponseCode.FAIL.getCode()); } } } /** * 資源釋放 */ public void destory() { if (client != null) { client.shutdownClient(); logger.debug("ESJestClient destory ! "); } } /** * 獲取指定索引資訊 * * @param indexName * @param typeName * @return */ public String getMapping(String indexName, String typeName) { GetMapping.Builder builder = new GetMapping.Builder(); builder.addIndex(indexName).addType(typeName); String res = null; try { JestResult result = client.execute(builder.build()); if (result != null && result.isSucceeded()) { res = result.getSourceAsObject(JsonObject.class).toString(); } } catch (Exception e) { logger.error("es get mapping Exception ", e); throw new AppException("獲取索引資訊失敗", ResponseCode.FAIL.getCode()); } return res; } /** * 插入單條資料 * 若該條資料已經存在,則覆蓋。 * * @param t * @return */ public boolean insertOrUpdateDoc(T t, String uniqueId, String index, String type) { //是否插入成功標識 boolean flag = false; Index.Builder builder = new Index.Builder(t); builder.id(uniqueId); builder.refresh(true); Index indexDoc = builder.index(index).type(type).build(); JestResult result; try { result = client.execute(indexDoc); if (result != null && result.isSucceeded()) flag = true; } catch (IOException e) { logger.error("ESJestClient insertDoc exception", e); } return flag; } /** * 批量插入資料 * * @param list * @param index * @param type * @return */ @SuppressWarnings("unchecked") public BatchOperaResult batchInsertDoc(List<T> list, String index, String type) { //批量資料操作結果 BatchOperaResult batchOperaResult = null; try { Bulk.Builder bulkBuilder = new Bulk.Builder(); //迴圈構造批量資料 for (T t : list) { Index indexDoc = new Index.Builder(t).index(index).type(type).build(); bulkBuilder.addAction(indexDoc); } BulkResult br = client.execute(bulkBuilder.build()); if (!br.isSucceeded()) { batchOperaResult = new BatchOperaResult(false, list, br.getFailedItems()); } else { batchOperaResult = new BatchOperaResult(true, list); } } catch (Exception e) { logger.error("ESJestClient.batchInsertDoc-exception", e); } return batchOperaResult; } /** * 組合查詢文件+滾動分頁 * 採用條件:資料量大,每頁的size應該很大 * * @param index * @param type * @param combinedQueryDto * @return * @throws Exception */ @SuppressWarnings("unchecked") public CombinedQueryResult combinedQueryByScroll(String index, String type, CombinedQueryDto combinedQueryDto) { CombinedQueryResult res = null; String scrollId = combinedQueryDto.getScrollId(); Map<String, Object> req = combinedQueryDto.getParams(); JestResult result = null; try { //首次查詢或滾動時間超時,則重新查詢 if (StringUtils.isEmpty(scrollId)) { //清除滾動ID clearScrollIds(); //迴圈構造查詢條件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); for (Map.Entry entry : req.entrySet()) { //過濾器中,must表示查詢出來的文件必須包含傳入的值 boolQueryBuilder.must(QueryBuilders.matchQuery(entry.getKey().toString(), entry.getValue())); } searchSourceBuilder.query(boolQueryBuilder).size(combinedQueryDto.getSize()); //構造查詢條件,設定索引及型別 Search search = new Search.Builder(searchSourceBuilder.toString()) .addIndex(index).addType(type).setParameter(Parameters.SCROLL, SCROLL_ALIVE_TIME) .build(); //第一次檢索,拍下快照 result = client.execute(search); } else { //只能向後滾動,不能向前滾動 for (int i = 0; i < combinedQueryDto.getScrollTime(); i++) { //直接滾動 SearchScroll scroll = new SearchScroll.Builder(scrollId, SCROLL_ALIVE_TIME).build(); result = client.execute(scroll); //第一次滾動時,判斷scrollId是否過期,過期丟擲異常 if (i == 1) { JsonObject errMsg = result.getJsonObject().getAsJsonObject("error"); if (errMsg != null) { throw new AppException(errMsg.getAsJsonArray("root_cause") .getAsString(), ResponseCode.ERROR.getCode()); } } } } if (result != null && !result.isSucceeded()) { throw new AppException("ESJestClient ScrollQuery Fail...", ResponseCode.FAIL.getCode()); } //構造返回查詢返回結果 res = buildResponse(result); } catch (IOException e) { logger.error("ESJestClient ScrollQuery IOException...", e); throw new AppException("從ES查詢DOC異常...", ResponseCode.ERROR.getCode()); } catch (Exception e) { logger.error("ESJestClient ScrollQuery Exception...", e); throw new AppException("從ES查詢DOC異常...", ResponseCode.ERROR.getCode()); } return res; } /** * 清楚滾動ID */ private void clearScrollIds() { ClearScroll clearScroll = new ClearScroll.Builder().build(); try { client.execute(clearScroll); } catch (IOException e) { logger.error("ESJestClient Clean ScrollIds Exception...", e); } } /** * 結果集為空時,gson串事例: * <p> * {"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"failed":0},"hits":{"total":0,"max_score":null,"hits":[]}} * * @param result * @return */ @SuppressWarnings("unchecked") private CombinedQueryResult buildResponse(JestResult result) { CombinedQueryResult res = new CombinedQueryResult(); JsonObject jsonObject = result.getJsonObject(); JsonArray jsonElements = jsonObject.getAsJsonObject(QUERY_HITS_KEY).getAsJsonArray(QUERY_HITS_KEY); List<T> list = new ArrayList<T>(); Gson gson = new Gson(); for (JsonElement jsonElement : jsonElements) { list.add((T) gson.fromJson(jsonElement, Map.class).get(SOURCE_KEY)); } String scrollId = jsonObject.getAsJsonPrimitive(SCORLL_ID_KEY).getAsString(); //不為空,才算文件查詢成功 if (list.size() > 0) { res.setResList(list); res.setScrollId(scrollId); } else throw new AppException("ESJestClient ScrollQuery Fail...", ResponseCode.FAIL.getCode()); return res; } /** * 組合查詢+深入分頁 * 採用條件:資料量小,不會進行大翻頁時 * * @param index * @param type * @param combinedQueryDto * @return */ public CombinedQueryResult combinedQueryByFromSize(String index, String type, CombinedQueryDto combinedQueryDto) { CombinedQueryResult res = null; JestResult result; Map<String, Object> req = combinedQueryDto.getParams(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); List<T> list; try { for (Map.Entry entry : req.entrySet()) { boolQueryBuilder.must(QueryBuilders.matchQuery(entry.getKey().toString(), entry.getValue())); } searchSourceBuilder.query(boolQueryBuilder).size(combinedQueryDto.getSize()).from(combinedQueryDto.getFrom()); Search search = new Search.Builder(searchSourceBuilder.toString()) .addIndex(index).addType(type).setParameter(Parameters.SCROLL, SCROLL_ALIVE_TIME) .setSearchType(SearchType.SCAN).build(); result = client.execute(search); if (result != null && !result.isSucceeded()) { throw new AppException("ESJestClient FromSizeQuery Fail...", ResponseCode.FAIL.getCode()); } list = null; JsonArray jsonElements = result.getJsonObject().getAsJsonObject(QUERY_HITS_KEY).getAsJsonObject(QUERY_HITS_KEY).getAsJsonArray(SOURCE_KEY); if (jsonElements != null) { list = new Gson().fromJson(jsonElements, new TypeToken<List<T>>() { }.getType()); } } catch (IOException e) { logger.error("ESJestClient FromSizeQuery Exception...", e); throw new AppException("從ES查詢DOC異常...", ResponseCode.ERROR.getCode()); } catch (Exception e) { logger.error("ESJestClient FromSizeQuery Exception...", e); throw new AppException("從ES查詢DOC異常...", ResponseCode.ERROR.getCode()); } res.setResList(list); return res; } /** * 建立索引 * * @param index 索引名稱 */ public boolean createIndex(String index) { boolean result = false; try { JestResult jestResult = client.execute(new CreateIndex.Builder(index).build()); if (jestResult != null && jestResult.isSucceeded()) { result = true; } } catch (IOException e) { logger.error("EsJestClient-createIndex error", e); throw new AppException("索引建立失敗"); } return result; } /** * @param index * @param type * @param mappingStr Gson串 {"service_log": {"properties": {"id": {"type": "string"},"service": {"type": "string"},"app_code": {"type": "string"}}}} * @return */ public boolean createIndexMapping(String index, String type, String mappingStr) { boolean result = false; PutMapping putMapping = new PutMapping.Builder(index, type, mappingStr).build(); try { JestResult jestResult = client.execute(putMapping); if (jestResult != null && jestResult.isSucceeded()) { result = true; } } catch (IOException e) { e.printStackTrace(); } return result; } }