1. 程式人生 > >Elasticsearch Rest-Client使用教程

Elasticsearch Rest-Client使用教程

Rest-Client使用教程

Elasticsearch curl命令

-XGET一種請求方法

-d 標識以post形式傳入引數 ,寫在請求正文裡面

?pretty=true 以格式的形式顯示結果

curl -XGET http://localhost:9200/_cluster/health?pretty

--查詢elasticsearch的健康資訊

curl -XGET http://localhost:9200/ --查詢例項的相關資訊

curl -XGET http://localhost:9200/_cluster/nodes/ --得到叢集中節點的相關資訊

curl -XPOST http://localhost:9200/_cluster/nodes/_shutdown --關閉整個叢集

curl -XPOST http://localhost:9200/_cluster/nodes/aaaa/_shutdown --關閉叢集中指定節點

curl -XPOST http://localhost:9200/test --建立名為test的索引

curl -XDELETE http://localhost:9200/test --刪除名為test的索引

curl -XGET ‘http://10.10.110.2:19200/benlaitest/_search?pretty=true’ -d ‘{“query”:{“multi_match”:{“query”:“法國”,“fields”:[“firstname”,“lastname”]}}}’ --查詢資料(匹配firstname和lastname)

curl http://10.10.110.160:9200/benlaitest/_analyze?analyzer=standard -d 我愛你中國

postman執行請求API:

http://10.10.110.160:9200/_cat/indices?v – Get請求 檢視有多少索引

http://10.10.110.160:9200/benlaitest/_analyze?analyzer=standard --檢視分詞結果

DSL 介紹

Elasticsearch提供豐富且靈活的查詢語言叫做DSL查詢(Query DSL),它允許你構建更加複雜、強大的查詢。DSL(Domain Specific Language特定領域語言)以JSON請求體的形式出現。

DSL 簡單用法

  • 查詢所有的商品:
GET /product_index/product/_search
{
  "query": {
    "match_all": {}
}
  • 查詢商品名稱包含 milk 的商品,同時按照價格降序排序:
GET /product_index/product/_search
{
 "query": {
   "match": {
     "product_name": "milk"
   }
 },
 "sort": [
   {
     "price": "desc"
   }
 ]
}
  • 分頁指定結果欄位查詢商品:
GET /product_index/product/_search
{
  "query": {
    "match_all": {}
  },
  "_source": [
    "product_name",
    "price"
  ],
  "from": 0, ## 從第幾個商品開始查,最開始是 0
  "size": 1  ## 要查幾個結果
}
  • range 用法,查詢數值、時間區間:
GET /product_index/product/_search
{
  "query": {
    "range": {
      "price": {
        "gte": 30.00
      }
    }
  }
}

多搜尋條件組合查詢(最常用)

  • bool 下包括:must(必須匹配,類似於資料庫的 =),must_not(必須不匹配,類似於資料庫的
    !=),should(沒有強制匹配,類似於資料庫的 or),filter(過濾)
GET /product_index/product/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "product_name": "pure milk"
          }
        }
      ],
      "should": [
        {
          "match": {
            "product_desc": "常溫"
          }
        }
      ],
      "must_not": [
        {
          "match": {
            "product_name": "蒙牛"
          }
        }
      ],
      "filter": {
        "range": {
          "price": {
            "gte": 33.00
          }
        }
      }
    }
  }
}     

Rest-Client

Elastic正在開發高階客戶端,它將在REST客戶端之上工作,並允許您傳送DSL查詢等。

案例分析

  1. 需求:將爬蟲的原始資料組裝成DSL語句,每次5000資料量批量新增資料到es
	// 儲存爬蟲原始資料
	public void saveCrawlerInstanceData(List<CrawlerInstanceData> list) {
		// 開啟批量插入
		try {
			String pre = "爬蟲原始資料";

			StringBuilder bulkRequestBody = new StringBuilder();
			//開啟計數
			int count = 1;
			for (CrawlerInstanceData data : list) {

				// 新增es資訊異常
				try {

				    Map map = BeanUtil.transBean2Map(data);
                    map.put("ctime", data.getCtime().getTime());
                    //索引名稱
                    String esIndex = getESIndex(4);

					String requestJson = JSON.toJSONString(map, WriteMapNullValue);
					String actionMetaData = String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" ,\"_id\" : \"%s\"} }%n",
							indexPrefix + esIndex, "crawlerdata", data.getId());
					//拼裝json語句
					bulkRequestBody.append(actionMetaData);
					bulkRequestBody.append(requestJson);
					bulkRequestBody.append("\n");

					String esPath = String.format("/%s/%s/%s", indexPrefix + esIndex,"crawlerdata", "_bulk");

					//每次批量新增5000條資料
					if (count % 5000 == 0 || count == list.size() ) {
						String resultJson = RestClientUtil.getESDtats(restClient, bulkRequestBody.toString(), esPath, "POST");
						if (StringUtils.isBlank(resultJson)) {
							logger.error("{} es POST 批量插入資料操作不成功 ", pre);
							throw new Exception(" es POST 批量插入資料操作不成功");
						} else {
							bulkRequestBody = new StringBuilder();
						}
					}
					count++;
				} catch (Exception e) {
					// TODO: handle exception
					logger.error("新增es資訊異常", e);
				}

			}
		} catch (Exception e) {
			logger.error("es儲存爬蟲原始資料異常", e);
		}
	}

封裝RestClient dsl執行語句

public static String getESDtats(RestClient restClient, String sql, String esPath, String requestType) {
        if (null != restClient && !StringUtil.isBlank(sql) && !StringUtil.isBlank(esPath)) {
            HttpEntity entity = new NStringEntity(sql, ContentType.APPLICATION_JSON);
            String result = null;

            try {
                Response indexResponse = restClient.performRequest(null == requestType ? "GET" : requestType, esPath, Collections.emptyMap(), entity, new Header[0]);
                result = EntityUtils.toString(indexResponse.getEntity());
            } catch (IOException var8) {
                LOGGER.error(ExceptionUtils.getStackTrace(var8));
            }

            return result;
        } else {
            return null;
        }
    }
  1. 需求:聚合查詢爬蟲編號為crawler_instance_id的爬蟲示例成功爬蟲的資料量crawler_cnt,實際轉換成業務資料量update_cnt
	public Map<String, Object> lastTaskDataCount(String crawlerInstanceId) {

		String esIndex = getESIndex(4);

		//拼裝dsl json語句
		StringBuilder requestBody = new StringBuilder();
		requestBody.append("{\"size\":10,\"_source\":[\"\"],\"from\": 0,");
		requestBody.append("\"query\":{\"term\":{\"crawler_instance_id\":").append(crawlerInstanceId).append("}},");
		// 入庫資料量統計,爬取資料量統計
		requestBody.append("\"aggs\": {\"crawlTotalAgg\": {\"sum\": {\"field\":\"crawler_cnt\"}},");
		requestBody.append("\"updateTotalAgg\":{ \"sum\": {\"field\":\"update_cnt\"}}}}");

		//es路徑
		String esPath = String.format("/%s/%s/%s", indexPrefix + esIndex,"crawlerdata", "_search");
		String result = RestClientUtil.getESDtats(restClient, requestBody.toString(), esPath, "Get");

		Map<String, Object> map = new HashMap<String, Object>();

		JSONObject jsonObject = JSONObject.parseObject(result);
		if (null != jsonObject) {
			String aggregations = jsonObject.getString("aggregations");
			JSONObject aggregationsObject = JSONObject.parseObject(aggregations);
			if (aggregationsObject != null) {
				map.put("crawl_total",  (int)Double.parseDouble(JSONObject.parseObject(aggregationsObject.getString("crawlTotalAgg")).getString("value")));
				map.put("update_total", (int)Double.parseDouble(JSONObject.parseObject(aggregationsObject.getString("updateTotalAgg")).getString("value")));
			}
			JSONObject hitsObject = JSONObject.parseObject(String.valueOf(jsonObject.get("hits")));
			if (null != hitsObject) {
				map.put("data_total",Integer.parseInt(String.valueOf(hitsObject.get("total"))));
			}
		}

		return map;
	}

聚合查詢DSL語句,_source為空,僅返回聚合查詢資料,加快查詢效率

{
	"size": 10,
	"_source": [""],
	"from": 0,
	"query": {
		"term": {
			"crawler_instance_id": crawler_instance_id
		}
	},
	"aggs": {
		"crawlTotalAgg": {
			"sum": {
				"field": "crawler_cnt"
			}
		},
		"updateTotalAgg": {
			"sum": {
				"field": "update_cnt"
			}
		}
	}
}
  1. 需求:多重聚合查詢指定media_id網站crawler_code爬蟲類型最近7次crawler_instance_id爬蟲示例成功爬蟲的資料量crawler_cnt,實際轉換成業務資料量update_cnt
	// 最近7次爬蟲資料情況
	public List<Map<String, Object>> lastSevenTaskCount(String crawlerCode, String mediaId) {

		String esIndex = getESIndex(4);


		List<Map<String, Object>> reList = new ArrayList<Map<String, Object>>();

		//拼裝dsl json語句
		StringBuilder requestBody = new StringBuilder();
		requestBody.append("{\"size\":7,\"_source\":[\"id\"],\"from\": 0,");
		requestBody.append("\"query\":{\"bool\":{\"must\":[{\"term\":{\"crawler_code.keyword\":\"").append(crawlerCode).append("\"}},{");
		requestBody.append("\"term\": {\"media_id\":").append(mediaId).append("}}]}},");
		requestBody.append("\"aggs\": {\"crawler_instance_id_agg\": {\"terms\": {\"field\": \"crawler_instance_id\",\"size\":7,\"order\":{\"_term\": \"desc\"}},");
		requestBody.append("\"aggs\": {\"crawler_code_agg\": {\"terms\": {\"field\":\"crawler_code\"},");
		requestBody.append("\"aggs\": {\"media_id_agg\": {\"terms\": {\"field\":\"media_id\"},");
		// 更新入庫資料量統計,爬取資料量統計
		requestBody.append("\"aggs\": {\"update_sum_agg\": {\"sum\": {\"field\": \"update_cnt\"}},\"crawler_cnt_agg\": {\"sum\": {\"field\": \"crawler_cnt\"").append("}}}}}}}}},");
		requestBody.append("\"sort\": [{\"id\": {\"order\": \"desc\"}}]}");
		//es路徑
		String esPath = String.format("/%s/%s/%s", indexPrefix + esIndex,"crawlerdata", "_search");
		String result = RestClientUtil.getESDtats(restClient, requestBody.toString(), esPath, "Get");
		List<Map> crawlerInstanceList = RestClientUtil.getAggregationsListByResult(result,"crawler_instance_id_agg");

		crawlerInstanceList.stream().forEach(x->{
			JSONObject crawlerCodeObject = (JSONObject)x.get("crawler_code_agg");
			List<Map> crawlerCodeList =bucketsObject(crawlerCodeObject);
			Map<String, Object> map = new HashMap<String, Object>();
			map.put("crawler_instance_id", x.get("key"));
			crawlerCodeList.stream().forEach(y->{
				map.put("crawler_code", y.get("key"));
				JSONObject media = (JSONObject)y.get("media_id_agg");
				List<Map> mediaList =bucketsObject(media);
				mediaList.stream().forEach(z->{
					JSONObject updateSum = (JSONObject)z.get("update_sum_agg");

					map.put("update_sum",  (int)Double.parseDouble(updateSum.getString("value")));
					JSONObject crawlTotal = (JSONObject)z.get("crawler_cnt_agg");
					map.put("crawl_total", (int)Double.parseDouble(crawlTotal.getString("value")));

				});
			});
			reList.add(map);
		});
		return reList;
	}

聚合查詢DSL語句

{
	"size": 7,
	"_source": ["id"],
	"from": 0,
	"query": {
		"bool": {
			"must": [{
				"term": {
					"crawler_code.keyword": "新聞"//爬蟲類型
				}
			}, {
				"term": {
					"media_id": 4//網站編號
				}
			}]
		}
	},
	"aggs": {
		"crawler_instance_id_agg": {
			"terms": {
				"field": "crawler_instance_id",
				"size": 7,
				"order": {
					"_term": "desc"
				}
			},
			"aggs": {
				"crawler_code_agg": {
					"terms": {
						"field": "crawler_code"
					},
					"aggs": {
						"media_id_agg": {
							"terms": {
								"field": "media_id"
							},
							"aggs": {
								"update_sum_agg": {
									"sum": {
										"field": "update_cnt"
									}
								},
								"crawler_cnt_agg": {
									"sum": {
										"field": "crawler_cnt"
									}
								}
							}
						}
					}
				}
			}
		}
	},
	"sort": [{
		"id": {
			"order": "desc"
		}
	}]
} {
	"range": {
		"update_cnt": {
			"gt": 0
		}
	}
}]
}
}
}
  1. 需求:大資料量Elastic資料遷移方法,使用了查詢ES的scroll方式,對比通過ES的DSL查詢語句用分頁from和size的分頁查詢到了千萬級別之後,from就會慢的出奇,甚至報錯
	//組裝es指定欄位查詢語句
    public Map<String, Object> getEsQueryInfo() {

        Map<String, Object> infoMap = new HashMap<>(16);

        String esQuery = "";

        StringBuilder requestBody = new StringBuilder();
        requestBody.append("{\"size\":\"").append(esSize).append("\",");
        requestBody.append("\"query\": {\"bool\":{\"must\":{\"term\":{\"").append(field).append("\":\"").append(value);
        requestBody.append("\"}}}}}");
        infoMap.put("esQuery", requestBody.toString());
        infoMap.put("oldEsUrl", String.format("/%s/_search?scr