Elasticsearch Rest-Client使用教程
Rest-Client使用教程
Elasticsearch curl命令
-XGET一種請求方法
-d 標識以post形式傳入引數 ,寫在請求正文裡面
?pretty=true 以格式的形式顯示結果
curl -XGET http://localhost:9200/_cluster/health?pretty
curl -XGET http://localhost:9200/ --查詢例項的相關資訊
curl -XGET http://localhost:9200/_cluster/nodes/ --得到叢集中節點的相關資訊
curl -XPOST http://localhost:9200/_cluster/nodes/_shutdown --關閉整個叢集
curl -XPOST http://localhost:9200/_cluster/nodes/aaaa/_shutdown --關閉叢集中指定節點
curl -XPOST http://localhost:9200/test --建立名為test的索引
curl -XDELETE http://localhost:9200/test --刪除名為test的索引
curl -XGET ‘http://10.10.110.2:19200/benlaitest/_search?pretty=true’ -d ‘{“query”:{“multi_match”:{“query”:“法國”,“fields”:[“firstname”,“lastname”]}}}’ --查詢資料(匹配firstname和lastname)
curl http://10.10.110.160:9200/benlaitest/_analyze?analyzer=standard -d 我愛你中國
postman執行請求API:
http://10.10.110.160:9200/benlaitest/_analyze?analyzer=standard --檢視分詞結果
DSL 介紹
Elasticsearch提供豐富且靈活的查詢語言叫做DSL查詢(Query DSL),它允許你構建更加複雜、強大的查詢。DSL(Domain Specific Language特定領域語言)以JSON請求體的形式出現。
DSL 簡單用法
- 查詢所有的商品:
GET /product_index/product/_search
{
"query": {
"match_all": {}
}
- 查詢商品名稱包含 milk 的商品,同時按照價格降序排序:
GET /product_index/product/_search
{
"query": {
"match": {
"product_name": "milk"
}
},
"sort": [
{
"price": "desc"
}
]
}
- 分頁指定結果欄位查詢商品:
GET /product_index/product/_search
{
"query": {
"match_all": {}
},
"_source": [
"product_name",
"price"
],
"from": 0, ## 從第幾個商品開始查,最開始是 0
"size": 1 ## 要查幾個結果
}
- range 用法,查詢數值、時間區間:
GET /product_index/product/_search
{
"query": {
"range": {
"price": {
"gte": 30.00
}
}
}
}
多搜尋條件組合查詢(最常用)
- bool 下包括:must(必須匹配,類似於資料庫的 =),must_not(必須不匹配,類似於資料庫的
!=),should(沒有強制匹配,類似於資料庫的 or),filter(過濾)
GET /product_index/product/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"product_name": "pure milk"
}
}
],
"should": [
{
"match": {
"product_desc": "常溫"
}
}
],
"must_not": [
{
"match": {
"product_name": "蒙牛"
}
}
],
"filter": {
"range": {
"price": {
"gte": 33.00
}
}
}
}
}
}
Rest-Client
Elastic正在開發高階客戶端,它將在REST客戶端之上工作,並允許您傳送DSL查詢等。
案例分析
- 需求:將爬蟲的原始資料組裝成DSL語句,每次5000資料量批量新增資料到es
// 儲存爬蟲原始資料
public void saveCrawlerInstanceData(List<CrawlerInstanceData> list) {
// 開啟批量插入
try {
String pre = "爬蟲原始資料";
StringBuilder bulkRequestBody = new StringBuilder();
//開啟計數
int count = 1;
for (CrawlerInstanceData data : list) {
// 新增es資訊異常
try {
Map map = BeanUtil.transBean2Map(data);
map.put("ctime", data.getCtime().getTime());
//索引名稱
String esIndex = getESIndex(4);
String requestJson = JSON.toJSONString(map, WriteMapNullValue);
String actionMetaData = String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" ,\"_id\" : \"%s\"} }%n",
indexPrefix + esIndex, "crawlerdata", data.getId());
//拼裝json語句
bulkRequestBody.append(actionMetaData);
bulkRequestBody.append(requestJson);
bulkRequestBody.append("\n");
String esPath = String.format("/%s/%s/%s", indexPrefix + esIndex,"crawlerdata", "_bulk");
//每次批量新增5000條資料
if (count % 5000 == 0 || count == list.size() ) {
String resultJson = RestClientUtil.getESDtats(restClient, bulkRequestBody.toString(), esPath, "POST");
if (StringUtils.isBlank(resultJson)) {
logger.error("{} es POST 批量插入資料操作不成功 ", pre);
throw new Exception(" es POST 批量插入資料操作不成功");
} else {
bulkRequestBody = new StringBuilder();
}
}
count++;
} catch (Exception e) {
// TODO: handle exception
logger.error("新增es資訊異常", e);
}
}
} catch (Exception e) {
logger.error("es儲存爬蟲原始資料異常", e);
}
}
封裝RestClient dsl執行語句
public static String getESDtats(RestClient restClient, String sql, String esPath, String requestType) {
if (null != restClient && !StringUtil.isBlank(sql) && !StringUtil.isBlank(esPath)) {
HttpEntity entity = new NStringEntity(sql, ContentType.APPLICATION_JSON);
String result = null;
try {
Response indexResponse = restClient.performRequest(null == requestType ? "GET" : requestType, esPath, Collections.emptyMap(), entity, new Header[0]);
result = EntityUtils.toString(indexResponse.getEntity());
} catch (IOException var8) {
LOGGER.error(ExceptionUtils.getStackTrace(var8));
}
return result;
} else {
return null;
}
}
- 需求:聚合查詢爬蟲編號為crawler_instance_id的爬蟲示例成功爬蟲的資料量crawler_cnt,實際轉換成業務資料量update_cnt
public Map<String, Object> lastTaskDataCount(String crawlerInstanceId) {
String esIndex = getESIndex(4);
//拼裝dsl json語句
StringBuilder requestBody = new StringBuilder();
requestBody.append("{\"size\":10,\"_source\":[\"\"],\"from\": 0,");
requestBody.append("\"query\":{\"term\":{\"crawler_instance_id\":").append(crawlerInstanceId).append("}},");
// 入庫資料量統計,爬取資料量統計
requestBody.append("\"aggs\": {\"crawlTotalAgg\": {\"sum\": {\"field\":\"crawler_cnt\"}},");
requestBody.append("\"updateTotalAgg\":{ \"sum\": {\"field\":\"update_cnt\"}}}}");
//es路徑
String esPath = String.format("/%s/%s/%s", indexPrefix + esIndex,"crawlerdata", "_search");
String result = RestClientUtil.getESDtats(restClient, requestBody.toString(), esPath, "Get");
Map<String, Object> map = new HashMap<String, Object>();
JSONObject jsonObject = JSONObject.parseObject(result);
if (null != jsonObject) {
String aggregations = jsonObject.getString("aggregations");
JSONObject aggregationsObject = JSONObject.parseObject(aggregations);
if (aggregationsObject != null) {
map.put("crawl_total", (int)Double.parseDouble(JSONObject.parseObject(aggregationsObject.getString("crawlTotalAgg")).getString("value")));
map.put("update_total", (int)Double.parseDouble(JSONObject.parseObject(aggregationsObject.getString("updateTotalAgg")).getString("value")));
}
JSONObject hitsObject = JSONObject.parseObject(String.valueOf(jsonObject.get("hits")));
if (null != hitsObject) {
map.put("data_total",Integer.parseInt(String.valueOf(hitsObject.get("total"))));
}
}
return map;
}
聚合查詢DSL語句,_source為空,僅返回聚合查詢資料,加快查詢效率
{
"size": 10,
"_source": [""],
"from": 0,
"query": {
"term": {
"crawler_instance_id": crawler_instance_id
}
},
"aggs": {
"crawlTotalAgg": {
"sum": {
"field": "crawler_cnt"
}
},
"updateTotalAgg": {
"sum": {
"field": "update_cnt"
}
}
}
}
- 需求:多重聚合查詢指定media_id網站crawler_code爬蟲類型最近7次crawler_instance_id爬蟲示例成功爬蟲的資料量crawler_cnt,實際轉換成業務資料量update_cnt
// 最近7次爬蟲資料情況
public List<Map<String, Object>> lastSevenTaskCount(String crawlerCode, String mediaId) {
String esIndex = getESIndex(4);
List<Map<String, Object>> reList = new ArrayList<Map<String, Object>>();
//拼裝dsl json語句
StringBuilder requestBody = new StringBuilder();
requestBody.append("{\"size\":7,\"_source\":[\"id\"],\"from\": 0,");
requestBody.append("\"query\":{\"bool\":{\"must\":[{\"term\":{\"crawler_code.keyword\":\"").append(crawlerCode).append("\"}},{");
requestBody.append("\"term\": {\"media_id\":").append(mediaId).append("}}]}},");
requestBody.append("\"aggs\": {\"crawler_instance_id_agg\": {\"terms\": {\"field\": \"crawler_instance_id\",\"size\":7,\"order\":{\"_term\": \"desc\"}},");
requestBody.append("\"aggs\": {\"crawler_code_agg\": {\"terms\": {\"field\":\"crawler_code\"},");
requestBody.append("\"aggs\": {\"media_id_agg\": {\"terms\": {\"field\":\"media_id\"},");
// 更新入庫資料量統計,爬取資料量統計
requestBody.append("\"aggs\": {\"update_sum_agg\": {\"sum\": {\"field\": \"update_cnt\"}},\"crawler_cnt_agg\": {\"sum\": {\"field\": \"crawler_cnt\"").append("}}}}}}}}},");
requestBody.append("\"sort\": [{\"id\": {\"order\": \"desc\"}}]}");
//es路徑
String esPath = String.format("/%s/%s/%s", indexPrefix + esIndex,"crawlerdata", "_search");
String result = RestClientUtil.getESDtats(restClient, requestBody.toString(), esPath, "Get");
List<Map> crawlerInstanceList = RestClientUtil.getAggregationsListByResult(result,"crawler_instance_id_agg");
crawlerInstanceList.stream().forEach(x->{
JSONObject crawlerCodeObject = (JSONObject)x.get("crawler_code_agg");
List<Map> crawlerCodeList =bucketsObject(crawlerCodeObject);
Map<String, Object> map = new HashMap<String, Object>();
map.put("crawler_instance_id", x.get("key"));
crawlerCodeList.stream().forEach(y->{
map.put("crawler_code", y.get("key"));
JSONObject media = (JSONObject)y.get("media_id_agg");
List<Map> mediaList =bucketsObject(media);
mediaList.stream().forEach(z->{
JSONObject updateSum = (JSONObject)z.get("update_sum_agg");
map.put("update_sum", (int)Double.parseDouble(updateSum.getString("value")));
JSONObject crawlTotal = (JSONObject)z.get("crawler_cnt_agg");
map.put("crawl_total", (int)Double.parseDouble(crawlTotal.getString("value")));
});
});
reList.add(map);
});
return reList;
}
聚合查詢DSL語句
{
"size": 7,
"_source": ["id"],
"from": 0,
"query": {
"bool": {
"must": [{
"term": {
"crawler_code.keyword": "新聞"//爬蟲類型
}
}, {
"term": {
"media_id": 4//網站編號
}
}]
}
},
"aggs": {
"crawler_instance_id_agg": {
"terms": {
"field": "crawler_instance_id",
"size": 7,
"order": {
"_term": "desc"
}
},
"aggs": {
"crawler_code_agg": {
"terms": {
"field": "crawler_code"
},
"aggs": {
"media_id_agg": {
"terms": {
"field": "media_id"
},
"aggs": {
"update_sum_agg": {
"sum": {
"field": "update_cnt"
}
},
"crawler_cnt_agg": {
"sum": {
"field": "crawler_cnt"
}
}
}
}
}
}
}
}
},
"sort": [{
"id": {
"order": "desc"
}
}]
} {
"range": {
"update_cnt": {
"gt": 0
}
}
}]
}
}
}
- 需求:大資料量Elastic資料遷移方法,使用了查詢ES的scroll方式,對比通過ES的DSL查詢語句用分頁from和size的分頁查詢到了千萬級別之後,from就會慢的出奇,甚至報錯
//組裝es指定欄位查詢語句
public Map<String, Object> getEsQueryInfo() {
Map<String, Object> infoMap = new HashMap<>(16);
String esQuery = "";
StringBuilder requestBody = new StringBuilder();
requestBody.append("{\"size\":\"").append(esSize).append("\",");
requestBody.append("\"query\": {\"bool\":{\"must\":{\"term\":{\"").append(field).append("\":\"").append(value);
requestBody.append("\"}}}}}");
infoMap.put("esQuery", requestBody.toString());
infoMap.put("oldEsUrl", String.format("/%s/_search?scr