1. 程式人生 > 實用技巧 >Java操作ElasticSearch

Java操作ElasticSearch

  Java普通專案中操作ES可以用ES的客戶端 Transport;springboot專案可以用Spring Data Elasticsearch。

  需要注意ES暴露的http服務埠是9200,TCP通訊埠是9300,也就是Javaclient操作ES需要連線9300埠。

參考網站:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

1.pom配置如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.qlq</groupId> <artifactId>esclient</artifactId> <version>0.0.1-SNAPSHOT</version> <properties
> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> <maven.compiler.source>1.8</maven.compiler.source>
<java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>7.8.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.11.1</version> </dependency> </dependencies> </project>

resources下新建log4j2.properties

appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %marker%m%n

rootLogger.level = info
rootLogger.appenderRef.console.ref = console

2.API測試

其API操作過程如下:

1.建立client

單機版如下:

// on startup

TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300));

叢集如下:

Settings settings = Settings.builder()
        .put("cluster.name", "myClusterName").build();
TransportClient client = new PreBuiltTransportClient(settings);
//Add transport addresses and do something with the client...

TransportClient 進行操作的時候可以指定操作的索引型別、以及ID等操作,例如:

查詢時候指定索引型別:(prepareSearch()接收的是可變引數,可以指定多個索引型別搜尋。不指定預設是查詢所有型別)

        SearchRequestBuilder srb1 = client.prepareSearch("orders").setQuery(QueryBuilders.queryStringQuery("qiao"))
                .setSize(1);

建立時指定型別:(ID不指定ES會生成)

IndexResponse response = client.prepareIndex("orders", "order").setSource(builder).get();

2.操作

  進行CRUD

3.關閉client

// on shutdown

client.close();

0.準備工作

1.修改EK/conf/elasticsearch.yml下面的叢集名稱

cluster.name: my-application

2.啟動兩個節點

elasticsearch.bat -Ehttp.port=9200 -Epath.data=E:/data/0
elasticsearch.bat -Ehttp.port=19200 -Epath.data=E:/data/1

3.檢視節點

訪問:http://localhost:9200/_cluster/stats?pretty

1.文件API

1.建立索引文件

建立文件構造資料有4種方式,如下:

Manually (aka do it yourself) using native byte[] or as a String

Using a Map that will be automatically converted to its JSON equivalent

Using a third party library to serialize your beans such as Jackson

Using built-in helpers XContentFactory.jsonBuilder()

第一種:使用ES helper構造

    private static void createDocument() throws UnknownHostException, IOException {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        // 構造資料
        // 第一種:使用es heler
        XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("username", "qiaozhi")
                .field("fullname", "喬治").field("created", new Date()).field("deleted", false).endObject();
        IndexResponse response = client.prepareIndex("accounts", "person", "1").setSource(builder).get();

        // 第二種: 自己構造JSON資料
        // IndexResponse response = client.prepareIndex("twitter", "_doc")
        // .setSource(json, XContentType.JSON)
        // .get();

        // 列印儲存資訊
        // Index name
        String _index = response.getIndex();
        System.out.println("_index " + _index);
        // Type name
        String _type = response.getType();
        System.out.println("_type " + _type);
        // Document ID (generated or not)
        String _id = response.getId();
        System.out.println("_id " + _id);
        // Version (if it's the first time you index this document, you will
        // get: 1)
        long _version = response.getVersion();
        System.out.println("_version " + _version);
        // status has stored current instance statement.
        RestStatus status = response.status();
        System.out.println("status " + status);

        // on shutdown
        client.close();
    }

結果:

_index accounts

_type person

_id 1

_version 2

status OK

第二種:手動構造JSON資料且不指定ID會生成ID

    private static void createDocument() throws UnknownHostException, IOException {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        // 構造資料
        String json = "{" + "\"username\":\"zhangsan\"," + "\"fullname\":\"張三\"," + "\"deleted\":\"false\","
                + "\"created\":\"2020-02-05\"" + "}";
        // 儲存文件不指定ID
        IndexResponse response = client.prepareIndex("accounts", "person").setSource(json, XContentType.JSON).get();

        // 列印儲存資訊
        // Index name
        String _index = response.getIndex();
        System.out.println("_index " + _index);
        // Type name
        String _type = response.getType();
        System.out.println("_type " + _type);
        // Document ID (generated or not)
        String _id = response.getId();
        System.out.println("_id " + _id);
        // Version (if it's the first time you index this document, you will
        // get: 1)
        long _version = response.getVersion();
        System.out.println("_version " + _version);
        // status has stored current instance statement.
        RestStatus status = response.status();
        System.out.println("status " + status);

        // on shutdown
        client.close();
    }

結果:

_index accounts

_type person

_id nvyXyXMB58D4pLOfTCzx

_version 1

status CREATED

使用kibana進行查詢:

GET accounts/person/nvyXyXMB58D4pLOfTCzx

#! Deprecation: [types removal] Specifying types in document get requests is deprecated, use the /{index}/_doc/{id} endpoint instead.
{
  "_index" : "accounts",
  "_type" : "person",
  "_id" : "nvyXyXMB58D4pLOfTCzx",
  "_version" : 1,
  "_seq_no" : 7,
  "_primary_term" : 3,
  "found" : true,
  "_source" : {
    "username" : "zhangsan",
    "fullname" : "張三",
    "deleted" : "false",
    "created" : "2020-02-05"
  }
}

補充:如果已經存在相同ID的資料會進行修改操作,比如下面:

        // 構造資料
        String json = "{" + "\"username\":\"zhangsan2\"," + "\"fullname\":\"張三2\"," + "\"deleted\":\"false\","
                + "\"created\":\"2020-02-05\"" + "}";
        // 儲存文件不指定ID
        IndexResponse response = client.prepareIndex("accounts", "person", "nvyXyXMB58D4pLOfTCzx")
                .setSource(json, XContentType.JSON).get();

結果:

_index accounts

_type person

_id nvyXyXMB58D4pLOfTCzx

_version 2

status OK

kibana檢視資料如下:

#! Deprecation: [types removal] Specifying types in document get requests is deprecated, use the /{index}/_doc/{id} endpoint instead.
{
  "_index" : "accounts",
  "_type" : "person",
  "_id" : "nvyXyXMB58D4pLOfTCzx",
  "_version" : 2,
  "_seq_no" : 8,
  "_primary_term" : 3,
  "found" : true,
  "_source" : {
    "username" : "zhangsan2",
    "fullname" : "張三2",
    "deleted" : "false",
    "created" : "2020-02-05"
  }
}

補充:測試叢集中兩個節點都有資料

liqiang@root MINGW64 ~/Desktop
$ curl http://localhost:9200/accounts/person/nvyXyXMB58D4pLOfTCzx
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   220  100   220    0     0  13750      0 --:--:-- --:--:-- --:--:--  214k{"_index":"accounts","_type":"person","_id":"nvyXyXMB58D4pLOfTCzx","_version":2,"_seq_no":8,"_primary_term":3,"found":true,"_source":{"username":"zhangsan2","fullname":"張三2","deleted":"false","created":"2020-02-05"}}

liqiang@root MINGW64 ~/Desktop
$ curl http://localhost:19200/accounts/person/nvyXyXMB58D4pLOfTCzx
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   220  100   220    0     0   7096      0 --:--:-- --:--:-- --:--:-- 14666{"_index":"accounts","_type":"person","_id":"nvyXyXMB58D4pLOfTCzx","_version":2,"_seq_no":8,"_primary_term":3,"found":true,"_source":{"username":"zhangsan2","fullname":"張三2","deleted":"false","created":"2020-02-05"}}

2.查詢

    private static void getDocument() throws UnknownHostException {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        // 根據ID查詢
        GetResponse response = client.prepareGet("accounts", "person", "nvyXyXMB58D4pLOfTCzx").get();

        // 列印獲取資訊
        // Index name
        String _index = response.getIndex();
        System.out.println("_index " + _index);
        // Type name
        String _type = response.getType();
        System.out.println("_type " + _type);
        // Document ID (generated or not)
        String _id = response.getId();
        System.out.println("_id " + _id);
        // Version (if it's the first time you index this document, you will
        // get: 1)
        long _version = response.getVersion();
        System.out.println("_version " + _version);
        // 獲取存的資訊
        String sourceAsString = response.getSourceAsString();
        System.out.println("sourceAsString " + sourceAsString);
        Map<String, Object> sourceAsMap = response.getSourceAsMap();
        System.out.println("sourceAsMap " + sourceAsMap);

        // on shutdown
        client.close();
    }

結果:

_index accounts
_type person
_id nvyXyXMB58D4pLOfTCzx
_version 2
sourceAsString {"username":"zhangsan2","fullname":"張三2","deleted":"false","created":"2020-02-05"}
sourceAsMap {deleted=false, created=2020-02-05, fullname=張三2, username=zhangsan2}

批量查詢:

  再建立一個訂單型別的文件資料。(這裡需要注意,不允許一個index下面有多個type)

liqiang@root MINGW64 ~/Desktop
$ curl http://localhost:9200/orders/order/HSJgznMBk9PkhN4HiuEb
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   233  100   233    0     0   7516      0 --:--:-- --:--:-- --:--:-- 15533{"_index":"orders","_type":"order","_id":"HSJgznMBk9PkhN4HiuEb","_version":1,"_seq_no":0,"_primary_term":1,"found":true,"_source":{"desc":"測試訂單","createTime":"2020-08-08T14:01:37.160Z","deleted":false,"username":"zhangsan2"}}

批量查詢語法如下:

    private static void batchSelect() throws Exception {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        // 查詢多個型別的資料(接受可變型別的ID引數)
        MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                .add("accounts", "person", "nvyXyXMB58D4pLOfTCzx")
                .add("orders", "order", "HSJgznMBk9PkhN4HiuEb", "otherId").get();
        for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
            GetResponse response = itemResponse.getResponse();
            if (response.isExists()) {
                String json = response.getSourceAsString();
                System.out.println(json);
            }
        }

        // on shutdown
        client.close();
    }

結果:

{"username":"zhangsan2","fullname":"修改後","deleted":"false","created":"2020-02-05"}
{"desc":"測試訂單","createTime":"2020-08-08T14:01:37.160Z","deleted":false,"username":"zhangsan2"}

3.刪除文件

    private static void deleteDoc() throws UnknownHostException {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        // 根據ID刪除
        DeleteResponse response = client.prepareDelete("accounts", "person", "nvyXyXMB58D4pLOfTCzx").get();

        // 列印獲取資訊
        // Index name
        String _index = response.getIndex();
        System.out.println("_index " + _index);
        // Type name
        String _type = response.getType();
        System.out.println("_type " + _type);
        // Document ID (generated or not)
        String _id = response.getId();
        System.out.println("_id " + _id);
        // Version (if it's the first time you index this document, you will
        // get: 1)
        long _version = response.getVersion();
        System.out.println("_version " + _version);

        // on shutdown
        client.close();
    }

結果:

_index accounts
_type person
_id nvyXyXMB58D4pLOfTCzx
_version 3

刪除後再次查詢:

$ curl http://localhost:9200/accounts/person/nvyXyXMB58D4pLOfTCzx
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    81  100    81    0     0   2531      0 --:--:-- --:--:-- --:--:--  5062{"_index":"accounts","_type":"person","_id":"nvyXyXMB58D4pLOfTCzx","found":false}

也可以根據查詢結果進行刪除:

        BulkByScrollResponse response = new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
                .filter(QueryBuilders.matchQuery("fullname", "修改後")).source("accounts").get();
        long deleted = response.getDeleted();

4.修改文件:

例如已經存在的文件:

$ curl http://localhost:9200/accounts/person/nvyXyXMB58D4pLOfTCzx
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   221  100   221    0     0    523      0 --:--:-- --:--:-- --:--:--   565{"_index":"accounts","_type":"person","_id":"nvyXyXMB58D4pLOfTCzx","_version":1,"_seq_no":10,"_primary_term":4,"found":true,"_source":{"username":"zhangsan2","fullname":"張三2","deleted":"false","created":"2020-02-05"}}

(1)重新插入帶ID的文件就是修改

(2)使用UpdateRequest

        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("accounts");
        updateRequest.type("person");
        updateRequest.id("nvyXyXMB58D4pLOfTCzx");
        updateRequest.doc(XContentFactory.jsonBuilder().startObject().field("fullname", "修改後").endObject());
        UpdateResponse updateResponse = client.update(updateRequest).get();

2. 查詢API

首先準備十條測試資料,如下:

    private static void createDocument() throws UnknownHostException, IOException {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        for (int i = 0; i < 10; i++) {
            // 構造資料
            // 第一種:使用es heler
            XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("desc", "測試訂單" + i)
                    .field("createTime", new Date()).field("deleted", false).field("username", "zhangsan").endObject();
            IndexResponse response = client.prepareIndex("orders", "order").setSource(builder).get();

            // 列印儲存資訊
            // Index name
            String _index = response.getIndex();
            System.out.println("_index " + _index);
            // Type name
            String _type = response.getType();
            System.out.println("_type " + _type);
            // Document ID (generated or not)
            String _id = response.getId();
            System.out.println("_id " + _id);
            // Version (if it's the first time you index this document, you will
            // get: 1)
            long _version = response.getVersion();
            System.out.println("_version " + _version);
            // status has stored current instance statement.
            RestStatus status = response.status();
            System.out.println("status " + status);
        }

        // on shutdown
        client.close();
    }

1. 滾動查詢,類似於分頁查詢

        QueryBuilder qb = QueryBuilders.termQuery("username", "zhangsan");

        SearchResponse scrollResp = client.prepareSearch("orders")
                .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(new TimeValue(60000)).setQuery(qb)
                .setSize(3).get();
        // Scroll until no hits are returned

        int startPage = 1;
        do {
            System.out.println("開始分頁===" + (startPage++));
            for (SearchHit hit : scrollResp.getHits().getHits()) {
                // Handle the hit...
                String sourceAsString = hit.getSourceAsString();
                System.out.println(sourceAsString);
            }
            scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute()
                    .actionGet();
        } while (scrollResp.getHits().getHits().length != 0);

結果:

開始分頁===1

{"desc":"測試訂單0","createTime":"2020-08-09T02:31:18.435Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單1","createTime":"2020-08-09T02:31:19.455Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單2","createTime":"2020-08-09T02:31:19.655Z","deleted":false,"username":"zhangsan"}

開始分頁===2

{"desc":"測試訂單3","createTime":"2020-08-09T02:31:19.819Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單4","createTime":"2020-08-09T02:31:20.312Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單5","createTime":"2020-08-09T02:31:20.402Z","deleted":false,"username":"zhangsan"}

開始分頁===3

{"desc":"測試訂單6","createTime":"2020-08-09T02:31:20.536Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單7","createTime":"2020-08-09T02:31:20.667Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單8","createTime":"2020-08-09T02:31:20.866Z","deleted":false,"username":"zhangsan"}

開始分頁===4

{"desc":"測試訂單9","createTime":"2020-08-09T02:31:21.137Z","deleted":false,"username":"zhangsan"}

2.MultiSearch 多個條件查詢(不指定查詢的索引預設查詢所有的)

        SearchRequestBuilder srb1 = client.prepareSearch().setQuery(QueryBuilders.queryStringQuery("qiao")).setSize(1);
        SearchRequestBuilder srb2 = client.prepareSearch().setQuery(QueryBuilders.matchQuery("username", "zhangsan2"))
                .setSize(1);
        MultiSearchResponse sr = client.prepareMultiSearch().add(srb1).add(srb2).get();

        // You will get all individual responses from
        // MultiSearchResponse#getResponses()
        for (MultiSearchResponse.Item item : sr.getResponses()) {
            SearchResponse response = item.getResponse();
            SearchHits hits = response.getHits();
            for (SearchHit hit : hits) {
                System.out.println(hit.getSourceAsString());
            }
        }

結果:

{

"name": "zhi",

"lastName": "qiao",

"job": "enginee"

}

{"desc":"測試訂單","createTime":"2020-08-08T14:01:37.160Z","deleted":false,"username":"zhangsan2"}

3.Query DSL 以及聚合查詢

  下一篇介紹。