1. 程式人生 > >Elasticsearch(二):使用JAVA API實現簡單查詢、聚合查詢

Elasticsearch(二):使用JAVA API實現簡單查詢、聚合查詢

ES版本:2.3.1
JDK:1.8
所需要的jar包請在ES安裝路徑下的jars包中獲得,不要使用其他的jar否則容易出現版本問題!
注意:程式碼中TransportClient client=ESLink.getTransportClient();連線在上一篇

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse
; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search
.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; import org.elasticsearch.search.aggregations.metrics.max.InternalMax; import org.elasticsearch.search.aggregations.metrics.min.InternalMin; import org.elasticsearch.search.aggregations.metrics.sum
.InternalSum; import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder; import java.util.ArrayList; import java.util.HashMap; import java.util.List; /** * Created by LCY on 4/8/2018. * * 使用JAVA API對ES進行查詢刪除操作 */ public class ESUtils { /** * 簡單的查詢 * select * from {index} where {field}={Accept} limit {size}; * @param index 查詢的索引 * @param type 查詢的type,可以使用heda檢視 * @param field 查詢的欄位 * @param Accept 查詢的內容 * @param size 查詢結果的條數 * @return SearchResponse的Json */ public List<String> queryByFilter_Accept(String index, String type, String field, String Accept, int size) { TransportClient client=ESLink.getTransportClient(); SearchResponse response = client.prepareSearch(index)//設定要查詢的索引(index) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setTypes(type)//設定type, 這個在建立索引的時候同時設定了, 或者可以使用head工具檢視 .setQuery(QueryBuilders.matchQuery(field, Accept)) //在這裡"message"是要查詢的field,"Accept"是要查詢的內容 .setFrom(0) .setSize(size) .setExplain(true) .execute() .actionGet(); List<String> docList = new ArrayList<String>(); for (SearchHit hit : response.getHits()) { docList.add(hit.getSourceAsString()); } client.close(); return docList; } /** * 簡單的查詢 * select * from {index} limit {size}; * @param index 查詢的索引 * @param type 查詢的type,可以使用heda檢視 * @param size 查詢結果的條數 * @return 結果list */ public List<String> queryByFilter(String index, String type, int size) { TransportClient client=ESLink.getTransportClient(); SearchResponse response = client.prepareSearch(index)//設定要查詢的索引(index) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setTypes(type)//設定type, 這個在建立索引的時候同時設定了, 或者可以使用head工具檢視 .setFrom(0) .setSize(size) .setExplain(true) .execute() .actionGet(); List<String> docList = new ArrayList<String>(); for (SearchHit hit : response.getHits()) { docList.add(hit.getSourceAsString()); } client.close(); return docList; } /** * 刪除一條資料 *DELETE FROM {index} WHERE id = {id}; * @param index * @param type * @param id */ public boolean deleteDoc(String index, String type, String id) { boolean flg = false; TransportClient client=ESLink.getTransportClient(); DeleteResponse deleteResponse = client .prepareDelete() .setIndex(index) .setType(type) .setId(id) .get(); if (deleteResponse.isFound()) { flg = true; } client.close(); return flg; } /** * 使用min聚合查詢某個欄位上最小的值。 *SELECT Min({field}) FROM index; * @param index * @param type */ public double minDoc(String index, String type, String field) { TransportClient client=ESLink.getTransportClient(); SearchResponse response = client .prepareSearch(index) .setTypes(type) .addAggregation(AggregationBuilders.min("min").field(field)) .get(); InternalMin min = response.getAggregations().get("min"); client.close(); return min.getValue(); } /** * 使用max聚合查詢某個欄位上最大的值。 * SELECT MAX(field) FROM index; * @param index * @param type */ public double maxDoc(String index, String type, String field) { TransportClient client=ESLink.getTransportClient(); SearchResponse response = client .prepareSearch(index) .setTypes(type) .addAggregation(AggregationBuilders.max("max").field(field)) .get(); InternalMax max = response.getAggregations().get("max"); return max.getValue(); } /** * 使用max聚合查詢某個欄位上最大的值。 * SELECT SUM(field) FROM index; * @param index * @param type */ public double sumDoc(String index, String type, String field) { TransportClient client=ESLink.getTransportClient(); SearchResponse response = client .prepareSearch(index) .setTypes(type) .addAggregation(AggregationBuilders.sum("sum").field(field)) .get(); InternalSum sum = response.getAggregations().get("sum"); client.close(); return sum.getValue(); } /** * 聚合查詢 * * @param index * @param type * @param group_name * @param size */ public void testSumGroupBy(String index, String type, String group_name, int size) { TransportClient client=ESLink.getTransportClient(); TermsBuilder termsBuilder= AggregationBuilders .terms("group_return").field(group_name); SearchRequestBuilder searchRequestBuilder =client.prepareSearch(index).setTypes(type) .addAggregation(termsBuilder) .setSize(size); SearchResponse sr = searchRequestBuilder.execute().actionGet(); Terms genders = sr.getAggregations().get("group_return"); for (Terms.Bucket entry : genders.getBuckets()) { System.out.println("Key: "+entry.getKey()+"\t\tDoc:"+entry.getDocCount()); } client.close(); } /** * 巢狀聚合查詢 * select {group_name1},{group_name2} from {index} GROUP BY {group_name1} limit {size}; * @param index * @param type * @param group_name1 外層分組的欄位 * @param group_name2 內層分組的欄位 * @param size */ public void getGroupBy(String index, String type, String group_name1, String group_name2,int size,String timeOut) { TransportClient client=ESLink.getTransportClient(); TermsBuilder termsBuilder= AggregationBuilders .terms("group_return1").field(group_name1); TermsBuilder termsBuilder1= AggregationBuilders .terms("group_return2").field(group_name2); termsBuilder.subAggregation(termsBuilder1); SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type) .addAggregation(termsBuilder).setSize(size) .setTimeout(timeOut); SearchResponse sr = searchRequestBuilder.execute().actionGet(); Terms group1 = sr.getAggregations().get("group_return1"); for (Terms.Bucket entry_group1 : group1.getBuckets()) { System.out.println("Key: "+entry_group1.getKey()+"\t\tCountDoc:"+entry_group1.getDocCount()); Terms group2 = entry_group1.getAggregations().get("group_return2"); for (Terms.Bucket entry_group2 : group2.getBuckets()) { System.out.println("\t\t"+"Key: "+entry_group2.getKey()+"\t\tCountDoc:"+entry_group2.getDocCount()); } } client.close(); } /** * 巢狀聚合查詢儲存 * select {group_name1},{group_name2} from {index} GROUP BY {group_name1} limit {size}; * @param index * @param type * @param group_name1 外層分組的欄位 * @param group_name2 內層分組的欄位 * @param size * * @return 查詢結果的map,三層map,為json對應的位置 */ public HashMap getGroupByMap(String index, String type, String group_name1, String group_name2,int size,String timeOut) { TransportClient client=ESLink.getTransportClient(); TermsBuilder termsBuilder= AggregationBuilders .terms("group_return1").field(group_name1); TermsBuilder termsBuilder1= AggregationBuilders .terms("group_return2").field(group_name2); termsBuilder.subAggregation(termsBuilder1); SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type) .addAggregation(termsBuilder).setSize(size) .setTimeout(timeOut); SearchResponse sr = searchRequestBuilder.execute().actionGet(); Terms group1 = sr.getAggregations().get("group_return1"); HashMap<Integer, HashMap<String, Object>> res=new HashMap<>(); for (Terms.Bucket entry_group1 : group1.getBuckets()) { HashMap<String,Object> res1=new HashMap<String,Object>(); res1.put("Key",entry_group1.getKey()); res1.put("DocCount",entry_group1.getDocCount()); Terms group2 = entry_group1.getAggregations().get("group_return2"); for (Terms.Bucket entry_group2 : group2.getBuckets()) { HashMap<Object, Long> res2=new HashMap<>(); res2.put(entry_group2.getKey(),entry_group2.getDocCount()); res1.put("Buckets",res2); } res.put(size,res1); size--; } client.close(); return res; } /** * 巢狀聚合查詢儲存 * select {group_name1},{group_name2} from {index} GROUP BY {group_name1} ; * @param index * @param type * @param group_name1 外層分組的欄位 * @param group_name2 內層分組的欄位 * * @return 查詢結果的map,三層map,為json的Aggregation對應的三層 */ public HashMap getGroupByMap(String index, String type, String group_name1, String group_name2,String timeOut) { TransportClient client=ESLink.getTransportClient(); int size=0; TermsBuilder termsBuilder= AggregationBuilders .terms("group_return1").field(group_name1); TermsBuilder termsBuilder1= AggregationBuilders .terms("group_return2").field(group_name2); termsBuilder.subAggregation(termsBuilder1); SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type) .addAggregation(termsBuilder).setSize(size) .setTimeout(timeOut); SearchResponse sr = searchRequestBuilder.execute().actionGet(); Terms group1 = sr.getAggregations().get("group_return1"); HashMap<Integer, HashMap<String, Object>> res=new HashMap<>(); for (Terms.Bucket entry_group1 : group1.getBuckets()) { HashMap<String,Object> res1=new HashMap<String,Object>(); res1.put("Key",entry_group1.getKey()); res1.put("DocCount",entry_group1.getDocCount()); Terms group2 = entry_group1.getAggregations().get("group_return2"); for (Terms.Bucket entry_group2 : group2.getBuckets()) { HashMap<Object, Long> res2=new HashMap<>(); res2.put(entry_group2.getKey(),entry_group2.getDocCount()); res1.put("Buckets",res2); } res.put(size,res1); size++; } client.close(); return res; } /** * 巢狀查詢獲取列印結果列印 * * select {group_name1},{group_name2} from {index} GROUP BY {group_name1} limit {size}; * @param index * @param type * @param group_name1 外層分組的欄位 * @param group_name2 內層分組的欄位 * @param size 獲取的結果的大小 * */ public void getGroupPrint(String index, String type, String group_name1, String group_name2,int size,String timeOut){ HashMap hashMap=getGroupByMap(index, type, group_name1, group_name2, size, timeOut); for (Object h : hashMap.keySet()) { HashMap hashMap1 = (HashMap) hashMap.get(h); System.out.println("Key: "+ hashMap1.get("Key")+"\t\tCountDoc:"+ hashMap1.get("DocCount")); HashMap hashMap2= (HashMap)hashMap1.get("Buckets"); for (Object key :hashMap2.keySet()) { System.out.println("\t\t"+"Key: "+key+"\t\tCountDoc:"+ hashMap2.get(key)); } } } /** * 聚合查詢獲得Sum結果結果列印 * @SQL select sum{group_name} from {index} WHERE field_name=field_value GROUP BY {group_name}; * @param index * @param type * @param field_name 要精確查詢的欄位名 * @param field_value 要精確查詢的欄位值 * @param group_name 分組的欄位名 * @param timeOut 查詢時間 * */ public double getGroupSum(String index, String type, String field_name,String field_value, String group_name,String timeOut) { TransportClient client=ESLink.getTransportClient(); SumBuilder sumBuilder=AggregationBuilders.sum("group_sum").field(group_name); SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setTypes(type) .setQuery(QueryBuilders.matchQuery(field_name, field_value)) .addAggregation(sumBuilder) .setSize(0) .setTimeout(timeOut); SearchResponse sr = searchRequestBuilder.execute().actionGet(); InternalSum amount = sr.getAggregations().get("group_sum"); client.close(); return amount.value(); } }

測試程式碼:

import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ESTest {
    @Test
    public void testqueryByFilterr_Accept() {
        try {
            ESUtils utils = new ESUtils();
            List docList = utils.queryByFilter_Accept("order", "order", "create_date", "2011-11-10", 10);
            for (Object doc : docList) {
                System.out.println(doc);
            }
        } catch (Exception ex) {
            System.err.println(ex);
        }
    }

    @Test
    public void testqueryByFilter() {
        try {
            ESUtils utils = new ESUtils();
            List docList = utils.queryByFilter("order", "order", 15);
            for (Object doc : docList) {
                System.out.println(doc);
            }
        } catch (Exception ex) {
            System.err.println(ex);
        }
    }

    @Test
    public void testDeleteDoc() {
        try {
            ESUtils utils = new ESUtils();
            boolean deleteDoc = utils.deleteDoc("order", "order", "AWKO26k95Sm1nfyWQa83");
            System.out.print(deleteDoc);
        } catch (Exception ex) {
            System.err.println(ex);
        }
    }

    @Test
    public void testMinDoc() {
        try {
            ESUtils utils = new ESUtils();
            double min = utils.minDoc("order", "order", "buyer_id");
            System.out.print(min);
        } catch (Exception ex) {
            System.err.println(ex);
        }
    }

    @Test
    public void testMaxDoc() {
        try {
            ESUtils utils = new ESUtils();
            double max = utils.maxDoc("order", "order", "buyer_id");
            System.out.print(max);
        } catch (Exception ex) {
            System.err.println(ex);
        }
    }

    /**
     * 非巢狀查詢的測試
     */
    @Test
    public void testGroup() {
        try {
            long startTime = System.currentTimeMillis();   //獲取開始時間
            ESUtils utils = new ESUtils();
            utils.testSumGroupBy("order", "order", "buyer_id", 10);
            long endTime = System.currentTimeMillis(); //獲取結束時間
            System.out.println("程式執行時間: " + (endTime - startTime) + "ms");
        } catch (Exception ex) {
            System.err.println(ex);
        }

    }

    /**
     * 列印結果測試
     */
    @Test
    public void testPrint() {
        long startTime = System.currentTimeMillis();   //獲取開始時間
        ESUtils utils = new ESUtils();
        utils.getGroupPrint("order", "order", "buyer_id", "order_id", 10, "600ms");
        long endTime = System.currentTimeMillis(); //獲取結束時間
        System.out.println("程式執行時間: " + (endTime - startTime) + "ms");
        try {
        } catch (Exception ex) {
            System.err.println(ex);
        }

    }

    /**
     * 巢狀查詢獲取結果的測試
     */
    @Test
    public void testGroup2() {

        try {
            long startTime = System.currentTimeMillis();   //獲取開始時間
            ESUtils utils = new ESUtils();
            utils.getGroupPrint("item", "item", "order_id", "goods_price", 10, "600ms");
            long endTime = System.currentTimeMillis(); //獲取結束時間
            System.out.println("程式執行時間: " + (endTime - startTime) + "ms");
        } catch (Exception ex) {
            System.err.println(ex);
        }
    }

    /**
     * 巢狀查詢獲取結果的測試
     * hashMap1 最外層map  <序號,Buyer_Map>
     * hashMap2 Buyer_Map <Buyer_id,Order_Map>
     * hashMap3 Order_Map < key,value>
     *     
     */
    @Test
    public void testGroupOrderItem() {
        try {
            long startTime = System.currentTimeMillis();
            HashMap hashMap1 = new ESUtils().getGroupByMap("order", "order", "buyer_id", "order_id", 10, "6000ms");
            for (Object h : hashMap1.keySet()) {
                HashMap hashMap2 = (HashMap) hashMap1.get(h);
                HashMap hashMap3 = (HashMap) hashMap2.get("Buckets");
                double sum=0.0;
                for (Object key : hashMap3.keySet()) {
                    sum+= new ESUtils().getGroupSum("item", "item","order_id", key.toString(),"goods_price", "6000ms");
                }
                System.out.println("buyer_id:"+hashMap2.get("Key")+"\t\tgoods_amount:"+sum);
            }
            long endTime = System.currentTimeMillis();
            System.out.println("程式執行時間: " + (endTime - startTime) + "ms");

        } catch (Exception ex) {
            System.err.println(ex);
        }
    }
}