ElasticSearch的增刪改查API介紹
阿新 • • 發佈:2019-02-10
1、基本用法
Elasticsearch叢集可以包含多個索引(indices),每一個索引可以包含多個型別(types),每一個型別包含多個文件(documents),然後每個文件包含多個欄位(Fields),它是面向文件型的儲存。ES比傳統關係型資料庫,就像如下:
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields
2、建立Client
public ElasticSearchService (String ipAddress, int port) {
client = new TransportClient()
.addTransportAddress(new InetSocketTransportAddress(ipAddress, port));
}
這裡是一個TransportClient。ES下兩種客戶端對比:
(1)TransportClient:輕量級的Client,使用Netty執行緒池,Socket連線到ES叢集。本身不加入到叢集,只作為請求的處理。
(2)Node Client:客戶端節點本身也是ES節點,加入到叢集,和其他ElasticSearch節點一樣。頻繁的開啟和關閉這類Node Clients會在叢集中產生“噪音”。
3、建立/刪除Index和Type資訊
//* 1、 建立索引
public void createIndex() {
client.admin().indices().create(new CreateIndexRequest(IndexName))
.actionGet();
}
// 2、 清除所有索引
public void deleteIndex() {
IndicesExistsResponse indicesExistsResponse = client.admin().indices()
.exists(new IndicesExistsRequest(new String[] { IndexName }))
.actionGet();
if (indicesExistsResponse.isExists()) {
client.admin().indices().delete(new DeleteIndexRequest(IndexName))
.actionGet();
}
}
// 3、 刪除Index下的某個Type
public void deleteType(){
client.prepareDelete().setIndex(IndexName).setType(TypeName)
.execute().actionGet();
}
// 4、 定義索引的對映型別(mapping)
public void defineIndexTypeMapping() {
try {
XContentBuilder mapBuilder = XContentFactory.jsonBuilder();
mapBuilder.startObject()
.startObject(TypeName)
.startObject("properties")
.startObject(IDFieldName).field("type", "long").field("store", "yes").endObject()
.startObject(SeqNumFieldName).field("type", "long").field("store", "yes").endObject()
.startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
.startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
.startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
.startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
.startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
.startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject()
.endObject()
.endObject()
.endObject();
PutMappingRequest putMappingRequest = Requests
.putMappingRequest(IndexName).type(TypeName).source(mapBuilder);
client.admin().indices().putMapping(putMappingRequest).actionGet();
} catch (IOException e) {
log.error(e.toString());
}
}
這裡自定義了某個Type的索引對映(Mapping),預設ES會自動處理資料型別的對映:針對整型對映為long,浮點數為double,字串對映為string,時間為date,true或false為boolean。
注意:針對字串,ES預設會做“analyzed”處理,即先做分詞、去掉stop words等處理再index。如果你需要把一個字串做為整體被索引到,需要把這個欄位這樣設定:field(“index”, “not_analyzed”)。
4、查詢索引資料
// 批量索引資料
public void indexHotSpotDataList(List<Hotspotdata> dataList) {
if (dataList != null) {
int size = dataList.size();
if (size > 0) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < size; ++i) {
Hotspotdata data = dataList.get(i);
String jsonSource = getIndexDataFromHotspotData(data);
if (jsonSource != null) {
bulkRequest.add(client.prepareIndex(IndexName, TypeName,
data.getId().toString())
.setRefresh(true).setSource(jsonSource));
}
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
Iterator<BulkItemResponse> iter = bulkResponse.iterator();
while (iter.hasNext()) {
BulkItemResponse itemResponse = iter.next();
if (itemResponse.isFailed()) {
log.error(itemResponse.getFailureMessage());
}
}
}
}
}
}
// 索引資料
public boolean indexHotspotData(Hotspotdata data) {
String jsonSource = getIndexDataFromHotspotData(data);
if (jsonSource != null) {
IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName,
TypeName).setRefresh(true);
requestBuilder.setSource(jsonSource)
.execute().actionGet();
return true;
}
return false;
}
// 得到索引字串
public String getIndexDataFromHotspotData(Hotspotdata data) {
String jsonString = null;
if (data != null) {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
jsonBuilder.startObject().field(IDFieldName, data.getId())
.field(SeqNumFieldName, data.getSeqNum())
.field(IMSIFieldName, data.getImsi())
.field(IMEIFieldName, data.getImei())
.field(DeviceIDFieldName, data.getDeviceID())
.field(OwnAreaFieldName, data.getOwnArea())
.field(TeleOperFieldName, data.getTeleOper())
.field(TimeFieldName, data.getCollectTime())
.endObject();
jsonString = jsonBuilder.string();
} catch (IOException e) {
log.equals(e);
}
}
return jsonString;
}
ES支援批量和單個數據索引。
5、查詢文件資料
//* 獲取少量資料100個
private List<Integer> getSearchData(QueryBuilder queryBuilder) {
List<Integer> ids = new ArrayList<>();
SearchResponse searchResponse = client.prepareSearch(IndexName)
.setTypes(TypeName).setQuery(queryBuilder).setSize(100)
.execute().actionGet();
SearchHits searchHits = searchResponse.getHits();
for (SearchHit searchHit : searchHits) {
Integer id = (Integer) searchHit.getSource().get("id");
ids.add(id);
}
return ids;
}
// 獲取大量資料
private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) {
List<Integer> ids = new ArrayList<>();
// 一次獲取100000資料
SearchResponse scrollResp = client.prepareSearch(IndexName)
.setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000))
.setQuery(queryBuilder).setSize(100000).execute().actionGet();
while (true) {
for (SearchHit searchHit : scrollResp.getHits().getHits()) {
Integer id = (Integer) searchHit.getSource().get(IDFieldName);
ids.add(id);
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000)).execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
return ids;
}
這裡的QueryBuilder是一個查詢條件,ES支援分頁查詢獲取資料,也可以一次性獲取大量資料,需要使用Scroll Search。
6、聚合(Aggregation Facet)查詢
//* 得到某段時間內裝置列表上每個裝置的資料分佈情況<裝置ID,數量>
public Map<String, String> getDeviceDistributedInfo(String startTime,
String endTime, List<String> deviceList) {
Map<String, String> resultsMap = new HashMap<>();
QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList);
QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime);
QueryBuilder queryBuilder = QueryBuilders.boolQuery().must(deviceQueryBuilder)
.must(rangeBuilder);
TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg")
.size(Integer.MAX_VALUE)
.field(DeviceIDFieldName);
SearchResponse searchResponse = client.prepareSearch(IndexName)
.setQuery(queryBuilder)
.addAggregation(termsBuilder)
.execute().actionGet();
Terms terms = searchResponse.getAggregations().get("DeviceIDAgg");
if (terms != null) {
for (Terms.Bucket entry : terms.getBuckets()) {
resultsMap.put(entry.getKey(),
String.valueOf(entry.getDocCount()));
}
}
return resultsMap;
}