1. 程式人生 > 實用技巧 >ES 之 Java API 基本操作和聚合操作

ES 之 Java API 基本操作和聚合操作

1、es是基於lucene開發的一個分散式全文索引框架。往ES中儲存和從ES中查詢,格式為Json

索引:Index 相當於資料庫中的database
型別:type  相當於資料庫中的table
主鍵:id    相當於資料庫中的主鍵

往ES中儲存資料,其實就是往ES中的index下的type儲存Json資料

RESTFul風格API
    通過http的形式,傳送請求,對ES進行操作
        查詢: GET
        刪除: DELETE
        新增: PUT/POST
        修改: PUT/POST
2、基本api區別
  * 結論:相關度查詢使用match,精確欄位查詢使用matchPhrase即可。
  * match:全文搜尋, 通常用於對text型別欄位的查詢,會對進行查詢的文字先進行分詞操作
  * term:精確查詢,通常用於對keyword和有精確值的欄位進行查詢,不會對進行查詢的文字進行分詞操作

1、EsCRUD.java

package es;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg; import org.elasticsearch.search.aggregations.metrics.max.InternalMax; import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.InternalSum; import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.net.InetAddress; import java.util.Date; import java.util.Iterator; import java.util.Map; import java.util.Set; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; /** * * https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.4/index.html */ public class EsCRUD { private TransportClient client = null; @Before public void init() throws Exception { //設定叢集名稱 Settings settings = Settings.builder() .put("cluster.name", "bigdata") //自動感知的功能(可以通過當前指定的節點獲取所有es節點的資訊) .put("client.transport.sniff", true) .build(); //建立client client = new PreBuiltTransportClient(settings).addTransportAddresses( new InetSocketTransportAddress(InetAddress.getByName("192.168.33.100"), 9300), new InetSocketTransportAddress(InetAddress.getByName("192.168.33.101"), 9300), new InetSocketTransportAddress(InetAddress.getByName("192.168.33.102"), 9300)); } @Test public void testCreate() throws IOException { IndexResponse response = client.prepareIndex("gamelog", "users", "1") .setSource( jsonBuilder() .startObject() .field("username", "老趙") .field("gender", "male") .field("birthday", new Date()) .field("fv", 9999) .field("message", "trying out Elasticsearch") .endObject() ).get(); } //查詢一條 @Test public void testGet() throws IOException { GetResponse response = client.prepareGet("gamelog", "users", "1").get(); System.out.println(response.getSourceAsString()); } //查詢多條 @Test public void testMultiGet() throws IOException { MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("gamelog", "users", "1") .add("gamelog", "users", "2", "3") .add("news", "fulltext", "1") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); System.out.println(json); } } } @Test public void testUpdate() throws Exception { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("gamelog"); updateRequest.type("users"); updateRequest.id("2"); updateRequest.doc( jsonBuilder() .startObject() .field("fv", 999.9) .endObject()); client.update(updateRequest).get(); } @Test public void testDelete() { DeleteResponse response = client.prepareDelete("gamelog", "users", "2").get(); System.out.println(response); } @Test public void testDeleteByQuery() { BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) //指定查詢條件 .filter(QueryBuilders.matchQuery("username", "老段")) //指定索引名稱 .source("gamelog") .get(); long deleted = response.getDeleted(); System.out.println(deleted); } //非同步刪除 @Test public void testDeleteByQueryAsync() { DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("gamelog") .execute(new ActionListener<BulkByScrollResponse>() { @Override public void onResponse(BulkByScrollResponse response) { long deleted = response.getDeleted(); System.out.println("資料刪除了"); System.out.println(deleted); } @Override public void onFailure(Exception e) { e.printStackTrace(); } }); try { System.out.println("非同步刪除"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } } @Test public void testRange() { QueryBuilder qb = rangeQuery("fv") // [88.99, 10000) .from(88.99) .to(10000) .includeLower(true) .includeUpper(false); SearchResponse response = client.prepareSearch("gamelog").setQuery(qb).get(); System.out.println(response); } /** * curl -XPUT 'http://192.168.5.251:9200/player_info/player/1' -d '{ "name": "curry", "age": 29, "salary": 3500,"team": "war", "position": "pg"}' * curl -XPUT 'http://192.168.5.251:9200/player_info/player/2' -d '{ "name": "thompson", "age": 26, "salary": 2000,"team": "war", "position": "pg"}' * curl -XPUT 'http://192.168.5.251:9200/player_info/player/3' -d '{ "name": "irving", "age": 25, "salary": 2000,"team": "cav", "position": "pg"}' * curl -XPUT 'http://192.168.5.251:9200/player_info/player/4' -d '{ "name": "green", "age": 26, "salary": 2000,"team": "war", "position": "pf"}' * curl -XPUT 'http://192.168.5.251:9200/player_info/player/5' -d '{ "name": "james", "age": 33, "salary": 4000,"team": "cav", "position": "sf"}' */ @Test public void testAddPlayer() throws IOException { IndexResponse response = client.prepareIndex("player_info", "player", "1") .setSource( jsonBuilder() .startObject() .field("name", "James") .field("age", 33) .field("salary", 3000) .field("team", "cav") .field("position", "sf") .endObject() ).get(); } /** * https://elasticsearch.cn/article/102 * * select team, count(*) as player_count from player group by team; */ @Test public void testAgg1() { //指定索引和type SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player"); //按team分組然後聚合,但是並沒有指定聚合函式 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("player_count").field("team"); //新增聚合器 builder.addAggregation(teamAgg); //觸發 SearchResponse response = builder.execute().actionGet(); //System.out.println(response); //將返回的結果放入到一個map中 key:欄位名 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); // Set<String> keys = aggMap.keySet(); // // for (String key: keys) { // System.out.println(key); // } // //取出聚合屬性 StringTerms terms = (StringTerms) aggMap.get("player_count"); // //// //依次迭代出分組聚合資料 // for (Terms.Bucket bucket : terms.getBuckets()) { // //分組的名字 // String team = (String) bucket.getKey(); // //count,分組後一個組有多少資料 // long count = bucket.getDocCount(); // System.out.println(team + " " + count); // } Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator(); while (teamBucketIt .hasNext()) { Terms.Bucket bucket = teamBucketIt.next(); String team = (String) bucket.getKey(); long count = bucket.getDocCount(); System.out.println(team + " " + count); } } /** * select team, position, count(*) as pos_count from player group by team, position; */ @Test public void testAgg2() { SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player"); //指定別名team_name和分組的欄位team TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team"); TermsAggregationBuilder posAgg= AggregationBuilders.terms("pos_count").field("position"); //新增兩個聚合構建器 builder.addAggregation(teamAgg.subAggregation(posAgg)); //執行查詢 SearchResponse response = builder.execute().actionGet(); //將查詢結果放入map中 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); //根據屬性名到map中查詢 StringTerms teams = (StringTerms) aggMap.get("team_name"); //迴圈查詢結果 for (Terms.Bucket teamBucket : teams.getBuckets()) { //先按球隊進行分組 String team = (String) teamBucket.getKey(); Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); StringTerms positions = (StringTerms) subAggMap.get("pos_count"); //因為一個球隊有很多位置,那麼還要依次拿出位置資訊 for (Terms.Bucket posBucket : positions.getBuckets()) { //拿到位置的名字 String pos = (String) posBucket.getKey(); //拿出該位置的數量 long docCount = posBucket.getDocCount(); //列印球隊,位置,人數 System.out.println(team + " " + pos + " " + docCount); } } } /** * select team, max(age) as max_age from player group by team; */ @Test public void testAgg3() { SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player"); //指定安球隊進行分組 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team"); //指定分組求最大值 MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age"); //分組後求最大值 builder.addAggregation(teamAgg.subAggregation(maxAgg)); //查詢 SearchResponse response = builder.execute().actionGet(); Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); //根據team屬性,獲取map中的內容 StringTerms teams = (StringTerms) aggMap.get("team_name"); for (Terms.Bucket teamBucket : teams.getBuckets()) { //分組的屬性名 String team = (String) teamBucket.getKey(); //在將聚合後取最大值的內容取出來放到map中 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); //取分組後的最大值 InternalMax ages = (InternalMax)subAggMap.get("max_age"); double max = ages.getValue(); System.out.println(team + " " + max); } } /** * select team, avg(age) as avg_age, sum(salary) as total_salary from player group by team; */ @Test public void testAgg4() { SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player"); //指定分組欄位 TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team"); //指定聚合函式是求平均資料 AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age"); //指定另外一個聚合函式是求和 SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary"); //分組的聚合器關聯了兩個聚合函式 builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg)); SearchResponse response = builder.execute().actionGet(); Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); //按分組的名字取出資料 StringTerms teams = (StringTerms) aggMap.get("team_name"); for (Terms.Bucket teamBucket : teams.getBuckets()) { //獲取球隊名字 String team = (String) teamBucket.getKey(); Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); //根據別名取出平均年齡 InternalAvg avgAge = (InternalAvg)subAggMap.get("avg_age"); //根據別名取出薪水總和 InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary"); double avgAgeValue = avgAge.getValue(); double totalSalaryValue = totalSalary.getValue(); System.out.println(team + " " + avgAgeValue + " " + totalSalaryValue); } } /** * select team, sum(salary) as total_salary from player group by team order by total_salary desc; */ @Test public void testAgg5() { SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player"); //按team進行分組,然後指定排序規則 TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team").order(Terms.Order.aggregation("total_salary ", true)); SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary"); builder.addAggregation(termsAgg.subAggregation(sumAgg)); SearchResponse response = builder.execute().actionGet(); Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); StringTerms teams = (StringTerms) aggMap.get("team_name"); for (Terms.Bucket teamBucket : teams.getBuckets()) { String team = (String) teamBucket.getKey(); Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary"); double totalSalaryValue = totalSalary.getValue(); System.out.println(team + " " + totalSalaryValue); } } }