Elasticsearch(二):使用JAVA API實現簡單查詢、聚合查詢
阿新 • • 發佈:2019-02-15
ES版本:2.3.1
JDK:1.8
所需要的jar包請在ES安裝路徑下的jars包中獲得,不要使用其他的jar否則容易出現版本問題!
注意:程式碼中TransportClient client=ESLink.getTransportClient();連線在上一篇
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse ;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search .aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
import org.elasticsearch.search.aggregations.metrics.min.InternalMin;
import org.elasticsearch.search.aggregations.metrics.sum .InternalSum;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* Created by LCY on 4/8/2018.
*
* 使用JAVA API對ES進行查詢刪除操作
*/
public class ESUtils {
/**
* 簡單的查詢
* select * from {index} where {field}={Accept} limit {size};
* @param index 查詢的索引
* @param type 查詢的type,可以使用heda檢視
* @param field 查詢的欄位
* @param Accept 查詢的內容
* @param size 查詢結果的條數
* @return SearchResponse的Json
*/
public List<String> queryByFilter_Accept(String index, String type, String field, String Accept, int size) {
TransportClient client=ESLink.getTransportClient();
SearchResponse response = client.prepareSearch(index)//設定要查詢的索引(index)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setTypes(type)//設定type, 這個在建立索引的時候同時設定了, 或者可以使用head工具檢視
.setQuery(QueryBuilders.matchQuery(field, Accept)) //在這裡"message"是要查詢的field,"Accept"是要查詢的內容
.setFrom(0)
.setSize(size)
.setExplain(true)
.execute()
.actionGet();
List<String> docList = new ArrayList<String>();
for (SearchHit hit : response.getHits()) {
docList.add(hit.getSourceAsString());
}
client.close();
return docList;
}
/**
* 簡單的查詢
* select * from {index} limit {size};
* @param index 查詢的索引
* @param type 查詢的type,可以使用heda檢視
* @param size 查詢結果的條數
* @return 結果list
*/
public List<String> queryByFilter(String index, String type, int size) {
TransportClient client=ESLink.getTransportClient();
SearchResponse response = client.prepareSearch(index)//設定要查詢的索引(index)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setTypes(type)//設定type, 這個在建立索引的時候同時設定了, 或者可以使用head工具檢視
.setFrom(0)
.setSize(size)
.setExplain(true)
.execute()
.actionGet();
List<String> docList = new ArrayList<String>();
for (SearchHit hit : response.getHits()) {
docList.add(hit.getSourceAsString());
}
client.close();
return docList;
}
/**
* 刪除一條資料
*DELETE FROM {index} WHERE id = {id};
* @param index
* @param type
* @param id
*/
public boolean deleteDoc(String index, String type, String id) {
boolean flg = false;
TransportClient client=ESLink.getTransportClient();
DeleteResponse deleteResponse = client
.prepareDelete()
.setIndex(index)
.setType(type)
.setId(id)
.get();
if (deleteResponse.isFound()) {
flg = true;
}
client.close();
return flg;
}
/**
* 使用min聚合查詢某個欄位上最小的值。
*SELECT Min({field}) FROM index;
* @param index
* @param type
*/
public double minDoc(String index, String type, String field) {
TransportClient client=ESLink.getTransportClient();
SearchResponse response = client
.prepareSearch(index)
.setTypes(type)
.addAggregation(AggregationBuilders.min("min").field(field))
.get();
InternalMin min = response.getAggregations().get("min");
client.close();
return min.getValue();
}
/**
* 使用max聚合查詢某個欄位上最大的值。
* SELECT MAX(field) FROM index;
* @param index
* @param type
*/
public double maxDoc(String index, String type, String field) {
TransportClient client=ESLink.getTransportClient();
SearchResponse response = client
.prepareSearch(index)
.setTypes(type)
.addAggregation(AggregationBuilders.max("max").field(field))
.get();
InternalMax max = response.getAggregations().get("max");
return max.getValue();
}
/**
* 使用max聚合查詢某個欄位上最大的值。
* SELECT SUM(field) FROM index;
* @param index
* @param type
*/
public double sumDoc(String index, String type, String field) {
TransportClient client=ESLink.getTransportClient();
SearchResponse response = client
.prepareSearch(index)
.setTypes(type)
.addAggregation(AggregationBuilders.sum("sum").field(field))
.get();
InternalSum sum = response.getAggregations().get("sum");
client.close();
return sum.getValue();
}
/**
* 聚合查詢
*
* @param index
* @param type
* @param group_name
* @param size
*/
public void testSumGroupBy(String index, String type, String group_name, int size) {
TransportClient client=ESLink.getTransportClient();
TermsBuilder termsBuilder= AggregationBuilders
.terms("group_return").field(group_name);
SearchRequestBuilder searchRequestBuilder =client.prepareSearch(index).setTypes(type)
.addAggregation(termsBuilder)
.setSize(size);
SearchResponse sr = searchRequestBuilder.execute().actionGet();
Terms genders = sr.getAggregations().get("group_return");
for (Terms.Bucket entry : genders.getBuckets()) {
System.out.println("Key: "+entry.getKey()+"\t\tDoc:"+entry.getDocCount());
}
client.close();
}
/**
* 巢狀聚合查詢
* select {group_name1},{group_name2} from {index} GROUP BY {group_name1} limit {size};
* @param index
* @param type
* @param group_name1 外層分組的欄位
* @param group_name2 內層分組的欄位
* @param size
*/
public void getGroupBy(String index, String type, String group_name1, String group_name2,int size,String timeOut) {
TransportClient client=ESLink.getTransportClient();
TermsBuilder termsBuilder= AggregationBuilders
.terms("group_return1").field(group_name1);
TermsBuilder termsBuilder1= AggregationBuilders
.terms("group_return2").field(group_name2);
termsBuilder.subAggregation(termsBuilder1);
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type)
.addAggregation(termsBuilder).setSize(size)
.setTimeout(timeOut);
SearchResponse sr = searchRequestBuilder.execute().actionGet();
Terms group1 = sr.getAggregations().get("group_return1");
for (Terms.Bucket entry_group1 : group1.getBuckets()) {
System.out.println("Key: "+entry_group1.getKey()+"\t\tCountDoc:"+entry_group1.getDocCount());
Terms group2 = entry_group1.getAggregations().get("group_return2");
for (Terms.Bucket entry_group2 : group2.getBuckets()) {
System.out.println("\t\t"+"Key: "+entry_group2.getKey()+"\t\tCountDoc:"+entry_group2.getDocCount());
}
}
client.close();
}
/**
* 巢狀聚合查詢儲存
* select {group_name1},{group_name2} from {index} GROUP BY {group_name1} limit {size};
* @param index
* @param type
* @param group_name1 外層分組的欄位
* @param group_name2 內層分組的欄位
* @param size
*
* @return 查詢結果的map,三層map,為json對應的位置
*/
public HashMap getGroupByMap(String index, String type, String group_name1, String group_name2,int size,String timeOut) {
TransportClient client=ESLink.getTransportClient();
TermsBuilder termsBuilder= AggregationBuilders
.terms("group_return1").field(group_name1);
TermsBuilder termsBuilder1= AggregationBuilders
.terms("group_return2").field(group_name2);
termsBuilder.subAggregation(termsBuilder1);
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type)
.addAggregation(termsBuilder).setSize(size)
.setTimeout(timeOut);
SearchResponse sr = searchRequestBuilder.execute().actionGet();
Terms group1 = sr.getAggregations().get("group_return1");
HashMap<Integer, HashMap<String, Object>> res=new HashMap<>();
for (Terms.Bucket entry_group1 : group1.getBuckets()) {
HashMap<String,Object> res1=new HashMap<String,Object>();
res1.put("Key",entry_group1.getKey());
res1.put("DocCount",entry_group1.getDocCount());
Terms group2 = entry_group1.getAggregations().get("group_return2");
for (Terms.Bucket entry_group2 : group2.getBuckets()) {
HashMap<Object, Long> res2=new HashMap<>();
res2.put(entry_group2.getKey(),entry_group2.getDocCount());
res1.put("Buckets",res2);
}
res.put(size,res1);
size--;
}
client.close();
return res;
}
/**
* 巢狀聚合查詢儲存
* select {group_name1},{group_name2} from {index} GROUP BY {group_name1} ;
* @param index
* @param type
* @param group_name1 外層分組的欄位
* @param group_name2 內層分組的欄位
*
* @return 查詢結果的map,三層map,為json的Aggregation對應的三層
*/
public HashMap getGroupByMap(String index, String type, String group_name1, String group_name2,String timeOut) {
TransportClient client=ESLink.getTransportClient();
int size=0;
TermsBuilder termsBuilder= AggregationBuilders
.terms("group_return1").field(group_name1);
TermsBuilder termsBuilder1= AggregationBuilders
.terms("group_return2").field(group_name2);
termsBuilder.subAggregation(termsBuilder1);
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type)
.addAggregation(termsBuilder).setSize(size)
.setTimeout(timeOut);
SearchResponse sr = searchRequestBuilder.execute().actionGet();
Terms group1 = sr.getAggregations().get("group_return1");
HashMap<Integer, HashMap<String, Object>> res=new HashMap<>();
for (Terms.Bucket entry_group1 : group1.getBuckets()) {
HashMap<String,Object> res1=new HashMap<String,Object>();
res1.put("Key",entry_group1.getKey());
res1.put("DocCount",entry_group1.getDocCount());
Terms group2 = entry_group1.getAggregations().get("group_return2");
for (Terms.Bucket entry_group2 : group2.getBuckets()) {
HashMap<Object, Long> res2=new HashMap<>();
res2.put(entry_group2.getKey(),entry_group2.getDocCount());
res1.put("Buckets",res2);
}
res.put(size,res1);
size++;
}
client.close();
return res;
}
/**
* 巢狀查詢獲取列印結果列印
* * select {group_name1},{group_name2} from {index} GROUP BY {group_name1} limit {size};
* @param index
* @param type
* @param group_name1 外層分組的欄位
* @param group_name2 內層分組的欄位
* @param size 獲取的結果的大小
*
*/
public void getGroupPrint(String index, String type, String group_name1, String group_name2,int size,String timeOut){
HashMap hashMap=getGroupByMap(index, type, group_name1, group_name2, size, timeOut);
for (Object h : hashMap.keySet()) {
HashMap hashMap1 = (HashMap) hashMap.get(h);
System.out.println("Key: "+ hashMap1.get("Key")+"\t\tCountDoc:"+ hashMap1.get("DocCount"));
HashMap hashMap2= (HashMap)hashMap1.get("Buckets");
for (Object key :hashMap2.keySet()) {
System.out.println("\t\t"+"Key: "+key+"\t\tCountDoc:"+ hashMap2.get(key));
}
}
}
/**
* 聚合查詢獲得Sum結果結果列印
* @SQL select sum{group_name} from {index} WHERE field_name=field_value GROUP BY {group_name};
* @param index
* @param type
* @param field_name 要精確查詢的欄位名
* @param field_value 要精確查詢的欄位值
* @param group_name 分組的欄位名
* @param timeOut 查詢時間
*
*/
public double getGroupSum(String index, String type, String field_name,String field_value, String group_name,String timeOut) {
TransportClient client=ESLink.getTransportClient();
SumBuilder sumBuilder=AggregationBuilders.sum("group_sum").field(group_name);
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setTypes(type)
.setQuery(QueryBuilders.matchQuery(field_name, field_value))
.addAggregation(sumBuilder)
.setSize(0)
.setTimeout(timeOut);
SearchResponse sr = searchRequestBuilder.execute().actionGet();
InternalSum amount = sr.getAggregations().get("group_sum");
client.close();
return amount.value();
}
}
測試程式碼:
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ESTest {
@Test
public void testqueryByFilterr_Accept() {
try {
ESUtils utils = new ESUtils();
List docList = utils.queryByFilter_Accept("order", "order", "create_date", "2011-11-10", 10);
for (Object doc : docList) {
System.out.println(doc);
}
} catch (Exception ex) {
System.err.println(ex);
}
}
@Test
public void testqueryByFilter() {
try {
ESUtils utils = new ESUtils();
List docList = utils.queryByFilter("order", "order", 15);
for (Object doc : docList) {
System.out.println(doc);
}
} catch (Exception ex) {
System.err.println(ex);
}
}
@Test
public void testDeleteDoc() {
try {
ESUtils utils = new ESUtils();
boolean deleteDoc = utils.deleteDoc("order", "order", "AWKO26k95Sm1nfyWQa83");
System.out.print(deleteDoc);
} catch (Exception ex) {
System.err.println(ex);
}
}
@Test
public void testMinDoc() {
try {
ESUtils utils = new ESUtils();
double min = utils.minDoc("order", "order", "buyer_id");
System.out.print(min);
} catch (Exception ex) {
System.err.println(ex);
}
}
@Test
public void testMaxDoc() {
try {
ESUtils utils = new ESUtils();
double max = utils.maxDoc("order", "order", "buyer_id");
System.out.print(max);
} catch (Exception ex) {
System.err.println(ex);
}
}
/**
* 非巢狀查詢的測試
*/
@Test
public void testGroup() {
try {
long startTime = System.currentTimeMillis(); //獲取開始時間
ESUtils utils = new ESUtils();
utils.testSumGroupBy("order", "order", "buyer_id", 10);
long endTime = System.currentTimeMillis(); //獲取結束時間
System.out.println("程式執行時間: " + (endTime - startTime) + "ms");
} catch (Exception ex) {
System.err.println(ex);
}
}
/**
* 列印結果測試
*/
@Test
public void testPrint() {
long startTime = System.currentTimeMillis(); //獲取開始時間
ESUtils utils = new ESUtils();
utils.getGroupPrint("order", "order", "buyer_id", "order_id", 10, "600ms");
long endTime = System.currentTimeMillis(); //獲取結束時間
System.out.println("程式執行時間: " + (endTime - startTime) + "ms");
try {
} catch (Exception ex) {
System.err.println(ex);
}
}
/**
* 巢狀查詢獲取結果的測試
*/
@Test
public void testGroup2() {
try {
long startTime = System.currentTimeMillis(); //獲取開始時間
ESUtils utils = new ESUtils();
utils.getGroupPrint("item", "item", "order_id", "goods_price", 10, "600ms");
long endTime = System.currentTimeMillis(); //獲取結束時間
System.out.println("程式執行時間: " + (endTime - startTime) + "ms");
} catch (Exception ex) {
System.err.println(ex);
}
}
/**
* 巢狀查詢獲取結果的測試
* hashMap1 最外層map <序號,Buyer_Map>
* hashMap2 Buyer_Map <Buyer_id,Order_Map>
* hashMap3 Order_Map < key,value>
*
*/
@Test
public void testGroupOrderItem() {
try {
long startTime = System.currentTimeMillis();
HashMap hashMap1 = new ESUtils().getGroupByMap("order", "order", "buyer_id", "order_id", 10, "6000ms");
for (Object h : hashMap1.keySet()) {
HashMap hashMap2 = (HashMap) hashMap1.get(h);
HashMap hashMap3 = (HashMap) hashMap2.get("Buckets");
double sum=0.0;
for (Object key : hashMap3.keySet()) {
sum+= new ESUtils().getGroupSum("item", "item","order_id", key.toString(),"goods_price", "6000ms");
}
System.out.println("buyer_id:"+hashMap2.get("Key")+"\t\tgoods_amount:"+sum);
}
long endTime = System.currentTimeMillis();
System.out.println("程式執行時間: " + (endTime - startTime) + "ms");
} catch (Exception ex) {
System.err.println(ex);
}
}
}