Jest客戶端索引新增、資料查詢、分頁例項
阿新 • • 發佈:2018-12-30
/**
* 〈一句話功能簡述〉操作ES的Jest客戶端<br>
* 〈功能詳細描述〉
*
* @author wangzha
* @see [相關類/方法](可選)
* @since [產品/模組版本] (可選)
*/
@Service
public class JestService<T> implements InitializingBean {
public static Logger logger = LoggerFactory.getLogger(JestService.class);
/**
* jest客戶端,單例
*/
private static JestClient client = null;
@Value("${es.cluster}")
private String cluster;
/**
* es 讀取超時時間
*/
private static final String ES_READ_TIMEOUT = "es.read.timeout";
/**
* es 連線超時時間
*/
private static final String ES_CONNECT_TIMEOUT = "es.connect.timeout" ;
/**
* 滾動ID key
*/
private static final String SCORLL_ID_KEY = "_scroll_id";
/**
* 命中集 key
*/
private static final String QUERY_HITS_KEY = "hits";
/**
* 資料來源 key
*/
private static final String SOURCE_KEY = "_source";
/**
* 搜尋上下文的時間,用來支援該批次
*/
private static final String SCROLL_ALIVE_TIME = "5m";
@Override
public void afterPropertiesSet() throws Exception {
init();
}
/**
* 客戶端初始化
*/
public void init() {
logger.info("ESJestClient Init Start...");
synchronized (JestService.class) {
if (client != null)
return;
// String[] uriArr = cluster.split(Symbols.COMMA);
String[] uriArr = new String[]{"http://10.37.2.142:9900",
"http://10.37.2.143:9900", "http://10.37.2.144:9900"};
if (uriArr.length > 0) {
Set<String> serverUris = new LinkedHashSet<String>();
for (int i = 0; i < uriArr.length; i++) {
if (StringUtils.isNotEmpty(uriArr[i]))
serverUris.add(uriArr[i]);
}
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig.Builder(serverUris)
// .connTimeout(SCMServiceCfg.getNodeTransInt(ES_CONNECT_TIMEOUT))
// .readTimeout(SCMServiceCfg.getNodeTransInt(ES_READ_TIMEOUT))
.connTimeout(1000)
.readTimeout(10000)
.multiThreaded(true)
.build());
client = factory.getObject();
logger.info(String.format("ESJestClient init success,ip:[%s]", uriArr));
} else {
throw new AppException("ESJestClient init error,uri is null,Check scm ! ", ResponseCode.FAIL.getCode());
}
}
}
/**
* 資源釋放
*/
public void destory() {
if (client != null) {
client.shutdownClient();
logger.debug("ESJestClient destory ! ");
}
}
/**
* 獲取指定索引資訊
*
* @param indexName
* @param typeName
* @return
*/
public String getMapping(String indexName, String typeName) {
GetMapping.Builder builder = new GetMapping.Builder();
builder.addIndex(indexName).addType(typeName);
String res = null;
try {
JestResult result = client.execute(builder.build());
if (result != null && result.isSucceeded()) {
res = result.getSourceAsObject(JsonObject.class).toString();
}
} catch (Exception e) {
logger.error("es get mapping Exception ", e);
throw new AppException("獲取索引資訊失敗", ResponseCode.FAIL.getCode());
}
return res;
}
/**
* 插入單條資料
* 若該條資料已經存在,則覆蓋。
*
* @param t
* @return
*/
public boolean insertOrUpdateDoc(T t, String uniqueId, String index, String type) {
//是否插入成功標識
boolean flag = false;
Index.Builder builder = new Index.Builder(t);
builder.id(uniqueId);
builder.refresh(true);
Index indexDoc = builder.index(index).type(type).build();
JestResult result;
try {
result = client.execute(indexDoc);
if (result != null && result.isSucceeded())
flag = true;
} catch (IOException e) {
logger.error("ESJestClient insertDoc exception", e);
}
return flag;
}
/**
* 批量插入資料
*
* @param list
* @param index
* @param type
* @return
*/
@SuppressWarnings("unchecked")
public BatchOperaResult batchInsertDoc(List<T> list, String index, String type) {
//批量資料操作結果
BatchOperaResult batchOperaResult = null;
try {
Bulk.Builder bulkBuilder = new Bulk.Builder();
//迴圈構造批量資料
for (T t : list) {
Index indexDoc = new Index.Builder(t).index(index).type(type).build();
bulkBuilder.addAction(indexDoc);
}
BulkResult br = client.execute(bulkBuilder.build());
if (!br.isSucceeded()) {
batchOperaResult = new BatchOperaResult(false, list, br.getFailedItems());
} else {
batchOperaResult = new BatchOperaResult(true, list);
}
} catch (Exception e) {
logger.error("ESJestClient.batchInsertDoc-exception", e);
}
return batchOperaResult;
}
/**
* 組合查詢文件+滾動分頁
* 採用條件:資料量大,每頁的size應該很大
*
* @param index
* @param type
* @param combinedQueryDto
* @return
* @throws Exception
*/
@SuppressWarnings("unchecked")
public CombinedQueryResult combinedQueryByScroll(String index, String type, CombinedQueryDto combinedQueryDto) {
CombinedQueryResult res = null;
String scrollId = combinedQueryDto.getScrollId();
Map<String, Object> req = combinedQueryDto.getParams();
JestResult result = null;
try {
//首次查詢或滾動時間超時,則重新查詢
if (StringUtils.isEmpty(scrollId)) {
//清除滾動ID
clearScrollIds();
//迴圈構造查詢條件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
for (Map.Entry entry : req.entrySet()) {
//過濾器中,must表示查詢出來的文件必須包含傳入的值
boolQueryBuilder.must(QueryBuilders.matchQuery(entry.getKey().toString(), entry.getValue()));
}
searchSourceBuilder.query(boolQueryBuilder).size(combinedQueryDto.getSize());
//構造查詢條件,設定索引及型別
Search search = new Search.Builder(searchSourceBuilder.toString())
.addIndex(index).addType(type).setParameter(Parameters.SCROLL, SCROLL_ALIVE_TIME)
.build();
//第一次檢索,拍下快照
result = client.execute(search);
} else {
//只能向後滾動,不能向前滾動
for (int i = 0; i < combinedQueryDto.getScrollTime(); i++) {
//直接滾動
SearchScroll scroll = new SearchScroll.Builder(scrollId, SCROLL_ALIVE_TIME).build();
result = client.execute(scroll);
//第一次滾動時,判斷scrollId是否過期,過期丟擲異常
if (i == 1) {
JsonObject errMsg = result.getJsonObject().getAsJsonObject("error");
if (errMsg != null) {
throw new AppException(errMsg.getAsJsonArray("root_cause")
.getAsString(), ResponseCode.ERROR.getCode());
}
}
}
}
if (result != null && !result.isSucceeded()) {
throw new AppException("ESJestClient ScrollQuery Fail...", ResponseCode.FAIL.getCode());
}
//構造返回查詢返回結果
res = buildResponse(result);
} catch (IOException e) {
logger.error("ESJestClient ScrollQuery IOException...", e);
throw new AppException("從ES查詢DOC異常...", ResponseCode.ERROR.getCode());
} catch (Exception e) {
logger.error("ESJestClient ScrollQuery Exception...", e);
throw new AppException("從ES查詢DOC異常...", ResponseCode.ERROR.getCode());
}
return res;
}
/**
* 清楚滾動ID
*/
private void clearScrollIds() {
ClearScroll clearScroll = new ClearScroll.Builder().build();
try {
client.execute(clearScroll);
} catch (IOException e) {
logger.error("ESJestClient Clean ScrollIds Exception...", e);
}
}
/**
* 結果集為空時,gson串事例:
* <p>
* {"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"failed":0},"hits":{"total":0,"max_score":null,"hits":[]}}
*
* @param result
* @return
*/
@SuppressWarnings("unchecked")
private CombinedQueryResult buildResponse(JestResult result) {
CombinedQueryResult res = new CombinedQueryResult();
JsonObject jsonObject = result.getJsonObject();
JsonArray jsonElements = jsonObject.getAsJsonObject(QUERY_HITS_KEY).getAsJsonArray(QUERY_HITS_KEY);
List<T> list = new ArrayList<T>();
Gson gson = new Gson();
for (JsonElement jsonElement : jsonElements) {
list.add((T) gson.fromJson(jsonElement, Map.class).get(SOURCE_KEY));
}
String scrollId = jsonObject.getAsJsonPrimitive(SCORLL_ID_KEY).getAsString();
//不為空,才算文件查詢成功
if (list.size() > 0) {
res.setResList(list);
res.setScrollId(scrollId);
} else
throw new AppException("ESJestClient ScrollQuery Fail...", ResponseCode.FAIL.getCode());
return res;
}
/**
* 組合查詢+深入分頁
* 採用條件:資料量小,不會進行大翻頁時
*
* @param index
* @param type
* @param combinedQueryDto
* @return
*/
public CombinedQueryResult combinedQueryByFromSize(String index, String type, CombinedQueryDto combinedQueryDto) {
CombinedQueryResult res = null;
JestResult result;
Map<String, Object> req = combinedQueryDto.getParams();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
List<T> list;
try {
for (Map.Entry entry : req.entrySet()) {
boolQueryBuilder.must(QueryBuilders.matchQuery(entry.getKey().toString(), entry.getValue()));
}
searchSourceBuilder.query(boolQueryBuilder).size(combinedQueryDto.getSize()).from(combinedQueryDto.getFrom());
Search search = new Search.Builder(searchSourceBuilder.toString())
.addIndex(index).addType(type).setParameter(Parameters.SCROLL, SCROLL_ALIVE_TIME)
.setSearchType(SearchType.SCAN).build();
result = client.execute(search);
if (result != null && !result.isSucceeded()) {
throw new AppException("ESJestClient FromSizeQuery Fail...", ResponseCode.FAIL.getCode());
}
list = null;
JsonArray jsonElements = result.getJsonObject().getAsJsonObject(QUERY_HITS_KEY).getAsJsonObject(QUERY_HITS_KEY).getAsJsonArray(SOURCE_KEY);
if (jsonElements != null) {
list = new Gson().fromJson(jsonElements, new TypeToken<List<T>>() {
}.getType());
}
} catch (IOException e) {
logger.error("ESJestClient FromSizeQuery Exception...", e);
throw new AppException("從ES查詢DOC異常...", ResponseCode.ERROR.getCode());
} catch (Exception e) {
logger.error("ESJestClient FromSizeQuery Exception...", e);
throw new AppException("從ES查詢DOC異常...", ResponseCode.ERROR.getCode());
}
res.setResList(list);
return res;
}
/**
* 建立索引
*
* @param index 索引名稱
*/
public boolean createIndex(String index) {
boolean result = false;
try {
JestResult jestResult = client.execute(new CreateIndex.Builder(index).build());
if (jestResult != null && jestResult.isSucceeded()) {
result = true;
}
} catch (IOException e) {
logger.error("EsJestClient-createIndex error", e);
throw new AppException("索引建立失敗");
}
return result;
}
/**
* @param index
* @param type
* @param mappingStr Gson串 {"service_log": {"properties": {"id": {"type": "string"},"service": {"type": "string"},"app_code": {"type": "string"}}}}
* @return
*/
public boolean createIndexMapping(String index, String type, String mappingStr) {
boolean result = false;
PutMapping putMapping = new PutMapping.Builder(index, type, mappingStr).build();
try {
JestResult jestResult = client.execute(putMapping);
if (jestResult != null && jestResult.isSucceeded()) {
result = true;
}
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
}