1. 程式人生 > >Elasticsearch Java API 很全的整理

Elasticsearch Java API 很全的整理

Elasticsearch 的API 分為 REST Client API(http請求形式)以及 transportClient API兩種。相比來說transportClient API效率更高,transportClient 是通過Elasticsearch內部RPC的形式進行請求的,連線可以是一個長連線,相當於是把客戶端的請求當成

Elasticsearch 叢集的一個節點。但是從Elasticsearch 7 後就會移除transportClient 。主要原因是transportClient 難以向下相容版本。

本文中所有的講解和操作都是基於jdk 1.8 和elasticsearch 6.2.4版本。

備註:本文參考了很多Elasticsearch 的官方文件以及部l網路資料做的綜合整理。

一、High REST Client

High Client 基於 Low Client, 主要目的是暴露一些 API,這些 API 可以接受請求物件為引數,返回響應物件,而對請求和響應細節的處理都是由 client 自動完成的。

API 在呼叫時都可以是同步或者非同步兩種形式
同步 API 會導致阻塞,一直等待資料返回
非同步 API 在命名上會加上 async 字尾,需要有一個 listener 作為引數,等這個請求返回結果或者發生錯誤時,這個 listener 就會被呼叫,listener主要是解決自動回撥的問題,有點像安卓 開發裡面的listener監聽回撥。

Elasticsearch REST APi 官方 地址:https://www.elastic.co/guide/en/elasticsearch/reference/6.2/index.html

Maven 依賴

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>

client初始化:

RestHighLevelClient 例項依賴 REST low-level client builder

public class ElasticSearchClient {
private String[] hostsAndPorts;

public ElasticSearchClient(String[] hostsAndPorts) {
this.hostsAndPorts = hostsAndPorts;
}
public RestHighLevelClient getClient() {
        RestHighLevelClient client = null;
        List<HttpHost> httpHosts = new ArrayList<HttpHost>();
        if (hostsAndPorts.length > 0) {
            for (String hostsAndPort : hostsAndPorts) {
                String[] hp = hostsAndPort.split(":");
                httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http"));
            }
            client = new RestHighLevelClient(
                    RestClient.builder(httpHosts.toArray(new HttpHost[0])));
        } else {
            client = new RestHighLevelClient(
                    RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")));
        }
        return client;
    }
}

 

文件 API(High level rest 客戶端支援下面的 文件(Document) API):

  • 單文件 API:
  • index API
  • Get API
  • Delete API
  • Update API
  • 多文件 API:
  • Bulk API
  • Multi-Get API

1、Index API:
IndexRequest:
封裝好的參考方法:

private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) {
        IndexRequest indexRequest = null;
        if (null == index || null == indexType) {
            throw new ElasticsearchException("index or indexType must not be null");
        }
        if (null == docId) {
            indexRequest = new IndexRequest(index, indexType);
        } else {
            indexRequest = new IndexRequest(index, indexType, docId);
        }
        return indexRequest;
    }

    /**
     * 同步執行索引
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @throws IOException
     */
    public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap));
    }

    /**
     * 非同步執行
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param indexResponseActionListener
     * @throws IOException
     */
    public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException {
        getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener);
    }

API解釋:  

 

IndexRequest request = new IndexRequest(
        "posts",  // 索引 Index
        "doc",  // Type 
        "1");  // 文件 Document Id 
String jsonString = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
        "}";
request.source(jsonString, XContentType.JSON); // 文件源格式為 json string

Document Source
document source 可以是下面的格式

Map型別的輸入:

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(jsonMap);  // 會自動將 Map 轉換為 JSON 格式

XContentBuilder : 這是 Document Source 提供的幫助類,專門用來產生 json 格式的資料:

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user", "kimchy");
    builder.timeField("postDate", new Date());
    builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(builder);

Object 鍵對:

IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source("user", "kimchy",
                "postDate", new Date(),
                "message", "trying out Elasticsearch"); 

同步索引:

IndexResponse indexResponse = client.index(request);

非同步索引:非同步執行函式需要新增 listener, 而對於 index 而言,這個 listener 的型別就是 ActionListener

client.indexAsync(request, listener); 

非同步方法執行後會立刻返回,在索引操作執行完成後,ActionListener 就會被回撥:

執行成功,呼叫 onResponse 函式
執行失敗,呼叫 onFailure 函式

ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
    @Override
    public void onResponse(IndexResponse indexResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

IndexResponse:
不管是同步回撥還是非同步回撥,如果呼叫成功,都會返回 IndexRespose 物件。 

String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
   // 文件第一次建立 
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
   // 文件之前已存在,當前是重寫
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功的分片數量少於總分片數量 
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason();  // 處理潛在的失敗資訊
    }
}

在索引時有版本衝突的話,會丟擲 ElasticsearchException

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .version(1); // 這裡是文件版本號
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
       // 衝突了 
    }
}

如果將 opType 設定為 create, 而且如果索引的文件與已存在的文件在 index, type 和 id 上均相同,也會丟擲衝突異常。

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .opType(DocWriteRequest.OpType.CREATE);
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        
    }
}

2、GET API
GET 請求
每個 GET 請求都必須需傳入下面 3 個引數:

  • Index
  • Type
  • Document id
GetRequest getRequest = new GetRequest(
        "posts", 
        "doc",  
        "1");  

可選引數
下面的引數都是可選的, 裡面的選項並不完整,如要獲取完整的屬性,請參考 官方文件

不獲取源資料,預設是獲取的

request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); 

配置返回資料中包含指定欄位

String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
        new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext); 

配置返回資料中排除指定欄位

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext =
        new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext); 

實時 預設為 true

request.realtime(false);

版本

request.version(2); 

版本型別

request.versionType(VersionType.EXTERNAL);

同步執行

GetResponse getResponse = client.get(getRequest);

非同步執行
此部分與 index 相似, 只有一點不同, 返回型別為 GetResponse

Get Response
返回的 GetResponse 物件包含要請求的文件資料(包含元資料和欄位)

 

String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) {
    long version = getResponse.getVersion();
    String sourceAsString = getResponse.getSourceAsString(); // string 形式   
    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // map 
    byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 位元組形式 
} else {
   // 沒有發現請求的文件 
}

在請求中如果包含特定的文件版本,如果與已存在的文件版本不匹配, 就會出現衝突

try {
    GetRequest request = new GetRequest("posts", "doc", "1").version(2);
    GetResponse getResponse = client.get(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本衝突        
    }
}
封裝好的參考方法:
  /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes  返回需要包含的欄位,可以傳入空
     * @param excludes  返回需要不包含的欄位,可以傳入為空
     * @param excludes  version
     * @param excludes  versionType
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException {
        if (null == includes || includes.length == 0) {
            includes = Strings.EMPTY_ARRAY;
        }
        if (null == excludes || excludes.length == 0) {
            excludes = Strings.EMPTY_ARRAY;
        }
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        getRequest.realtime(true);
        if (null != version) {
            getRequest.version(version);
        }
        if (null != versionType) {
            getRequest.versionType(versionType);
        }
        return getClient().get(getRequest.fetchSourceContext(fetchSourceContext));
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException {
        return getRequest(index, indexType, docId, includes, excludes, null, null);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public GetResponse getRequest(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        return getClient().get(getRequest);
    }

3、Exists API

如果文件存在 Exists API 返回 true, 否則返回 fasle。

Exists Request

GetRequest 用法和 Get API 差不多,兩個物件的可選引數是相同的。由於 exists() 方法只返回 true 或者 false, 建議將獲取 _source 以及任何儲存欄位的值關閉,儘量使請求輕量級。

GetRequest getRequest = new GetRequest(
    "posts",  // Index
    "doc",    // Type
    "1");     // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false));  // 禁用 _source 欄位
getRequest.storedFields("_none_"); // 禁止儲存任何欄位   

同步請求

boolean exists = client.exists(getRequest);

非同步請求
非同步請求與 Index API 相似,此處不贅述,只貼上程式碼。如需詳細瞭解,請參閱官方地址

ActionListener<Boolean> listener = new ActionListener<Boolean>() {
    @Override
    public void onResponse(Boolean exists) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.existsAsync(getRequest, listener); 

封裝的參考方法:

   /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public Boolean existDoc(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        return getClient().exists(getRequest);
    }

4、Delete API

Delete Request
DeleteRequest 必須傳入下面引數

DeleteRequest request = new DeleteRequest(
        "posts",   // index 
        "doc",     // doc
        "1");      // document id

可選引數
超時時間

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 

重新整理策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");    

版本

request.version(2); 

版本型別

request.versionType(VersionType.EXTERNAL); 
同步執行
DeleteResponse deleteResponse = client.delete(request);

非同步執行

ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
    @Override
    public void onResponse(DeleteResponse deleteResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};


client.deleteAsync(request, listener);
Delete Response

 

DeleteResponse 可以檢索執行操作的資訊

String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功分片數目小於總分片
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); // 處理潛在失敗
    }
}

也可以來檢查文件是否存在

DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(request);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    // 文件不存在
}
版本衝突時也會丟擲 `ElasticsearchException

try {
    DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
    DeleteResponse deleteResponse = client.delete(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本衝突
    }
}

封裝好的參考方法:

  /**
     * @param index
     * @param indexType
     * @param docId
     * @param timeValue
     * @param refreshPolicy
     * @param version
     * @param versionType
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId);
        if (null != timeValue) {
            deleteRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            deleteRequest.setRefreshPolicy(refreshPolicy);
        }
        if (null != version) {
            deleteRequest.version(version);
        }
        if (null != versionType) {
            deleteRequest.versionType(versionType);
        }
        return getClient().delete(deleteRequest);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException {
        return deleteDoc(index, indexType, docId, null, null, null, null);
    }

5、Update API

Update Request
UpdateRequest 的必需引數如下

UpdateRequest request = new UpdateRequest(
        "posts",  // Index
        "doc",  // 型別
        "1");   // 文件 Id

使用指令碼更新

部分文件更新:
在更新部分文件時,已存在文件與部分文件會合並。

部分文件可以有以下形式:

JSON 格式:

UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
request.doc(jsonString, XContentType.JSON); 

Map 格式:

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(jsonMap); 

XContentBuilder 物件:

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.timeField("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(builder);  
Object key-pairs

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); 

Upserts:如果文件不存在,可以使用 upserts 方法將文件以新文件的方式建立。

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); 

upserts 方法支援的文件格式與 update 方法相同。

可選引數:
超時時間

request.timeout(TimeValue.timeValueSeconds(1)); 
request.timeout("1s"); 

重新整理策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");  

衝突後重試次數

request.retryOnConflict(3);

獲取資料來源,預設是開啟的

request.fetchSource(true); 

包括特定欄位

String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(new FetchSourceContext(true, includes, excludes)); 

排除特定欄位

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(new FetchSourceContext(true, includes, excludes)); 

指定版本

request.version(2); 

禁用 noop detection

request.scriptedUpsert(true); 

 

設定如果更新的文件不存在,就必須要建立一個

request.docAsUpsert(true); 

同步執行

UpdateResponse updateResponse = client.update(request);

非同步執行

ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
    @Override
    public void onResponse(UpdateResponse updateResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.updateAsync(request, listener); 

Update Response

String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 文件已建立
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 文件已更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    // 文件已刪除
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    // 文件不受更新的影響
}

如果在 UpdateRequest 中使能了獲取源資料,響應中則包含了更新後的源文件資訊。

GetResult result = updateResponse.getGetResult(); 
if (result.isExists()) {
    String sourceAsString = result.sourceAsString();  // 將獲取的文件以 string 格式輸出
    Map<String, Object> sourceAsMap = result.sourceAsMap(); // 以 Map 格式輸出
    byte[] sourceAsBytes = result.source();  // 位元組形式
} else {
    // 預設情況下,不會返回文件源資料
}

也可以檢測是否分片失敗

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功的分片數量小於總分片數量
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); // 得到分片失敗的原因
    }
}

如果在執行 UpdateRequest 時,文件不存在,響應中會包含 404 狀態碼,而且會丟擲 ElasticsearchException 。

UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist")
        .doc("field", "value");
try {
    UpdateResponse updateResponse = client.update(request);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        // 處理文件不存在的情況
    }
}

如果版本衝突,也會丟擲 ElasticsearchException

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("field", "value")
        .version(1);
try {
    UpdateResponse updateResponse = client.update(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 處理版本衝突的情況
    }
}

封裝好的參考方法:

   /**
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param timeValue
     * @param refreshPolicy
     * @param version
     * @param versionType
     * @param docAsUpsert
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId);
        updateRequest.doc(dataMap);
        if (null != timeValue) {
            updateRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            updateRequest.setRefreshPolicy(refreshPolicy);
        }
        if (null != version) {
            updateRequest.version(version);
        }
        if (null != versionType) {
            updateRequest.versionType(versionType);
        }
        updateRequest.docAsUpsert(docAsUpsert);
        //衝突時重試的次數
        updateRequest.retryOnConflict(3);
        if (null == includes && null == excludes) {
            return getClient().update(updateRequest);
        } else {
            if (null == includes || includes.length == 0) {
                includes = Strings.EMPTY_ARRAY;
            }
            if (null == excludes || excludes.length == 0) {
                excludes = Strings.EMPTY_ARRAY;
            }
            return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes)));
        }
    }

    /**
     * 更新時不存在就插入
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @return
     * @throws IOException
     */
    public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null);
    }

    /**
     * 存在才更新
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null);
    }

 

6、Bulk API 批量處理

批量請求
使用 BulkRequest 可以在一次請求中執行多個索引,更新和刪除的操作。

BulkRequest request = new BulkRequest();  
request.add(new IndexRequest("posts", "doc", "1")  
        .source(XContentType.JSON,"field", "foo")); // 將第一個 IndexRequest 新增到批量請求中
request.add(new IndexRequest("posts", "doc", "2")  
        .source(XContentType.JSON,"field", "bar")); // 第二個
request.add(new IndexRequest("posts", "doc", "3")  
        .source(XContentType.JSON,"field", "baz")); // 第三個

在同一個 BulkRequest 也可以新增不同的操作型別

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3")); 
request.add(new UpdateRequest("posts", "doc", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")  
        .source(XContentType.JSON,"field", "baz"));

可選引數
超時時間

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 

重新整理策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for"); 

設定在批量操作前必須有幾個分片處於啟用狀態

 

request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);  // 全部分片都處於啟用狀態
request.waitForActiveShards(ActiveShardCount.DEFAULT);  // 預設
request.waitForActiveShards(ActiveShardCount.ONE);  // 一個

同步請求

BulkResponse bulkResponse = client.bulk(request);

非同步請求

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.bulkAsync(request, listener); 

Bulk Response
BulkResponse 中包含執行操作後的資訊,並允許對每個操作結果迭代。

for (BulkItemResponse bulkItemResponse : bulkResponse) { // 遍歷所有的操作結果
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); // 獲取操作結果的響應,可以是  IndexResponse, UpdateResponse or DeleteResponse, 它們都可以慚怍是 DocWriteResponse 例項

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
        IndexResponse indexResponse = (IndexResponse) itemResponse; // index 操作後的響應結果

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
        UpdateResponse updateResponse = (UpdateResponse) itemResponse; // update 操作後的響應結果

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse; // delete 操作後的響應結果
    }
}

此外,批量響應還有一個非常便捷的方法來檢測是否有一個或多個操作失敗

if (bulkResponse.hasFailures()) { 
    // 表示至少有一個操作失敗
}

在這種情況下,我們要遍歷所有的操作結果,檢查是否是失敗的操作,並獲取對應的失敗資訊

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { // 檢測給定的操作是否失敗
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); // 獲取失敗資訊

    }
}

Bulk Processor
BulkProcessor 是為了簡化 Bulk API 的操作提供的一個工具類,要執行操作,就需要下面元件

RestHighLevelClient 用來執行 BulkRequest 並獲取 BulkResponse`
BulkProcessor.Listener 對 BulkRequest 執行前後以及失敗時監聽
BulkProcessor.builder 方法用來構建一個新的BulkProcessor

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        // 在每個 BulkRequest 執行前呼叫
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        // 在每個 BulkRequest 執行後呼叫
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        // 失敗時呼叫
    }
};

BulkProcessor.Builder 提供了多個方法來配置 BulkProcessor
如何來處理請求的執行。

BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
builder.setBulkActions(500); // 指定多少操作時,就會重新整理一次
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0);  // 指定多大容量,就會重新整理一次
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // 允許併發執行的數量 
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 
BulkProcessor 建立後,各種請求就可以新增進去:

IndexRequest one = new IndexRequest("posts", "doc", "1").
        source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

BulkProcessor 執行時,會對每個 bulk request呼叫 BulkProcessor.Listener , listener 提供了下面方法來訪問 BulkRequest 和 BulkResponse:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); // 在執行前獲取操作的數量
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        if (response.hasFailures()) { // 執行後檢視響應中是否包含失敗的操作
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.error("Failed to execute bulk", failure); // 請求失敗時列印資訊
    }
};

請求新增到 BulkProcessor , 它的例項可以使用下面兩種方法關閉請求。

awaitClose() 在請求返回後或等待一定時間關閉

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); 

close() 立刻關閉

bulkProcessor.close();

兩個方法都會在關閉前對處理器中的請求進行重新整理,並避免新的請求新增進去。

封裝好的參考方法:

    /**
     * 批量操作
     *
     * @param indexBeanList
     * @param timeValue
     * @param refreshPolicy
     * @return
     * @throws IOException
     */
    public BulkResponse bulkRequest(List<IndexBean> indexBeanList, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy) throws IOException {
        BulkRequest bulkRequest = getBulkRequest(indexBeanList);
        if (null != timeValue) {
            bulkRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            bulkRequest.setRefreshPolicy(refreshPolicy);
        }
        return getClient().bulk(bulkRequest);
    }

    private BulkRequest getBulkRequest(List<IndexBean> indexBeanList) {
        BulkRequest bulkRequest = new BulkRequest();
        indexBeanList.forEach(indexBean -> {
            if ("1".equals(indexBean.getOperateType())) {
                bulkRequest.add(null != indexBean.getDocId() ? new IndexRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()) : new IndexRequest(indexBean.getIndex(), indexBean.getIndexType()));
            } else if ("2".equals(indexBean.getOperateType())) {
                if ((null != indexBean.getDocId())) {
                    throw new ElasticsearchException("update action docId must not be null");
                }
                bulkRequest.add(new UpdateRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()));
            } else if ("3".equals(indexBean.getOperateType())) {
                if ((null != indexBean.getDocId())) {
                    throw new ElasticsearchException("delete action docId must not be null");
                }
                bulkRequest.add(new DeleteRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()));
            } else {
                throw new ElasticsearchException("OperateType" + indexBean.getOperateType() + "is not support");
            }
        });
        return bulkRequest;
    }

    /**
     * 批量操作
     *
     * @param indexBeanList
     * @return
     */
    public BulkResponse bulkRequest(List<IndexBean> indexBeanList) throws IOException {
        return bulkRequest(indexBeanList, null, null);
    }

    /**
     * 批量非同步操作
     *
     * @param indexBeanList
     * @param bulkResponseActionListener
     */

    public void AsyncBulkRequest(List<IndexBean> indexBeanList, ActionListener<BulkResponse> bulkResponseActionListener) {
        getClient().bulkAsync(getBulkRequest(indexBeanList), bulkResponseActionListener);
    }

7、Search APIs:

Java High Level REST Client 支援下面的 Search API:

  • Search API
  • Search Scroll API
  • Clear Scroll API
  • Multi-Search API
  • Ranking Evaluation API

Search API
Search Request
searchRequest 用來完成和搜尋文件,聚合,建議等相關的任何操作同時也提供了各種方式來完成對查詢結果的高亮操作。

最基本的查詢操作如下

SearchRequest searchRequest = new SearchRequest(); 
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 
searchSourceBuilder.query(QueryBuilders.matchAllQuery()); // 新增 match_all 查詢
searchRequest.source(searchSourceBuilder); // 將 SearchSourceBuilder  新增到 SeachRequest 中

可選引數

SearchRequest searchRequest = new SearchRequest("posts");  // 設定搜尋的 index
searchRequest.types("doc");  // 設定搜尋的 type

除了配置 index 和 type 外,還有一些其他的可選引數

searchRequest.routing("routing"); // 設定 routing 引數
searchRequest.preference("_local");  // 配置搜尋時偏愛使用本地分片,預設是使用隨機分片

什麼是 routing 引數?
當索引一個文件的時候,文件會被儲存在一個主分片上。在儲存時一般都會有多個主分片。Elasticsearch 如何知道一個文件應該放置在哪個分片呢?這個過程是根據下面的這個公式來決定的:

shard = hash(routing) % number_of_primary_shards
routing 是一個可變值,預設是文件的 _id ,也可以設定成一個自定義的值
number_of_primary_shards 是主分片數量
所有的文件 API 都接受一個叫做 routing 的路由引數,通過這個引數我們可以自定義文件到分片的對映。一個自定義的路由引數可以用來確保所有相關的文件——例如所有屬於同一個使用者的文件——都被儲存到同一個分片中。

使用 SearchSourceBuilder
對搜尋行為的配置可以使用 SearchSourceBuilder 來完成,來看一個例項

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();  // 預設配置
sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); // 設定搜尋,可以是任何型別的 QueryBuilder
sourceBuilder.from(0); // 起始 index
sourceBuilder.size(5); // 大小 size
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); // 設定搜尋的超時時間

設定完成後,就可以新增到 SearchRequest 中。

SearchRequest searchRequest = new SearchRequest();
searchRequest.source(sourceBuilder);

構建查詢條件
查詢請求是通過使用 QueryBuilder 物件來完成的,並且支援 Query DSL。

DSL (domain-specific language) 領域特定語言,是指專注於某個應用程式領域的計算機語言。

可以使用建構函式來建立 QueryBuilder

MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy"); 

QueryBuilder 建立後,就可以呼叫方法來配置它的查詢選項:

matchQueryBuilder.fuzziness(Fuzziness.AUTO);  // 模糊查詢
matchQueryBuilder.prefixLength(3); // 字首查詢的長度
matchQueryBuilder.maxExpansions(10); // max expansion 選項,用來控制模糊查詢

也可以使用QueryBuilders 工具類來建立 QueryBuilder 物件。這個類提供了函數語言程式設計風格的各種方法用來快速建立 QueryBuilder 物件。

QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
                                        .fuzziness(Fuzziness.AUTO)
                                                .prefixLength(3)
                                                .maxExpansions(10);

fuzzy-matching 拼寫錯誤時的匹配:

好的全文檢索不應該是完全相同的限定邏輯,相反,可以擴大範圍來包括可能的匹配,從而根據相關性得分將更好的匹配放在前面。

例如,搜尋 quick brown fox 時會匹配一個包含 fast brown foxes 的文件

不論什麼方式建立的 QueryBuilder ,最後都需要新增到 `SearchSourceBuilder 中

searchSourceBuilder.query(matchQueryBuilder);

構建查詢 文件中提供了一個豐富的查詢列表,裡面包含各種查詢對應的QueryBuilder 物件以及QueryBuilder helper 方法,大家可以去參考。

關於構建查詢的內容會在下篇文章中講解,敬請期待。

指定排序
SearchSourceBuilder 允許新增一個或多個SortBuilder 例項。這裡包含 4 種特殊的實現, (Field-, Score-, GeoDistance- 和 ScriptSortBuilder)

sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); // 根據分數 _score 降序排列 (預設行為)
sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));  // 根據 id 降序排列

過濾資料來源
預設情況下,查詢請求會返回文件的內容 _source ,當然我們也可以配置它。例如,禁止對 _source 的獲取

sourceBuilder.fetchSource(false);

也可以使用萬用字元模式以更細的粒度包含或排除特定的欄位:

String[] includeFields = new String[] {"title", "user", "innerObject.*"};
String[] excludeFields = new String[] {"_type"};
sourceBuilder.fetchSource(includeFields, excludeFields);

高亮請求
可以通過在 SearchSourceBuilder 上設定 HighlightBuilder 完成對結果的高亮,而且可以配置不同的欄位具有不同的高亮行為。

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder highlightBuilder = new HighlightBuilder(); 
HighlightBuilder.Field highlightTitle =
        new HighlightBuilder.Field("title"); // title 欄位高亮
highlightTitle.highlighterType("unified");  // 配置高亮型別
highlightBuilder.field(highlightTitle);  // 新增到 builder
HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
highlightBuilder.field(highlightUser);
searchSourceBuilder.highlighter(highlightBuilder);

聚合請求
要實現聚合請求分兩步

建立合適的 `AggregationBuilder
作為引數配置在 `SearchSourceBuilder 上

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
        .field("company.keyword");
aggregation.subAggregation(AggregationBuilders.avg("average_age")
        .field("age"));
searchSourceBuilder.aggregation(aggregation);

建議請求 Requesting Suggestions

SuggestionBuilder 實現類是由 SuggestBuilders 工廠類來建立的。

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
SuggestionBuilder termSuggestionBuilder =
    SuggestBuilders.termSuggestion("user").text("kmichy"); 
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder); 
searchSourceBuilder.suggest(suggestBuilder);

對請求和聚合分析
分析 API 可用來對一個特定的查詢操作中的請求和聚合進行分析,此時要將SearchSourceBuilder 的 profile標誌位設定為 true

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.profile(true);

只要 SearchRequest 執行完成,對應的 SearchResponse 響應中就會包含 分析結果

同步執行
同步執行是阻塞式的,只有結果返回後才能繼續執行。

SearchResponse searchResponse = client.search(searchRequest);

非同步執行
非同步執行使用的是 listener 對結果進行處理。

ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
    @Override
    public void onResponse(SearchResponse searchResponse) {
        // 查詢成功
    }

    @Override
    public void onFailure(Exception e) {
        // 查詢失敗
    }
};

SearchResponse

查詢執行完成後,會返回 SearchResponse 物件,並在物件中包含查詢執行的細節和符合條件的文件集合。

歸納一下, SerchResponse 包含的資訊如下

請求本身的資訊,如 HTTP 狀態碼,執行時間,或者請求是否超時

RestStatus status = searchResponse.status(); // HTTP 狀態碼
TimeValue took = searchResponse.getTook(); // 查詢佔用的時間
Boolean terminatedEarly = searchResponse.isTerminatedEarly(); // 是否由於 SearchSourceBuilder 中設定 terminateAfter 而過早終止
boolean timedOut = searchResponse.isTimedOut(); // 是否超時

查詢影響的分片數量的統計資訊,成功和失敗的分片

int totalShards = searchResponse.getTotalShards();
int successfulShards = searchResponse.getSuccessfulShards();
int failedShards = searchResponse.getFailedShards();
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
    // failures should be handled here
}

檢索 SearchHits
要訪問返回的文件,首先要在響應中獲取其中的 SearchHits

SearchHits hits = searchResponse.getHits();

SearchHits 中包含了所有命中的全域性資訊,如查詢命中的數量或者最大分值:

long totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();

查詢的結果巢狀在 SearchHits 中,可以通過遍歷迴圈獲取

SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
    // do something with the SearchHit
}
SearchHit 提供瞭如 index , type, docId 和每個命中查詢的分數
String index = hit.getIndex();
String type = hit.getType();
String id = hit.getId();
float score = hit.getScore();

而且,還可以獲取到文件的源資料,以 JSON-String 形式或者 key-value map 對的形式。在 map 中,欄位可以是普通型別,或者是列表型別,巢狀物件。

String sourceAsString = hit.getSourceAsString();
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
String documentTitle = (String) sourceAsMap.get("title");
List<Object> users = (List<Object>) sourceAsMap.get("user");
Map<String, Object> innerObject =
        (Map<String, Object>) sourceAsMap.get("innerObject");

Search API 查詢關係
上面的 QueryBuilder , SearchSourceBuilder 和 SearchRequest 之間都是巢狀關係, 可以參考下圖:

 

 

8、全文查詢 Full Text Queries

什麼是全文查詢?
像使用 match 或者 query_string 這樣的高層查詢都屬於全文查詢,

查詢 日期(date) 或整數(integer) 欄位,會將查詢字串分別作為日期或整數對待。
查詢一個( not_analyzed )未分析的精確值字串欄位,會將整個查詢字串作為單個詞項對待。
查詢一個( analyzed )已分析的全文欄位,會先將查詢字串傳遞到一個合適的分析器,然後生成一個供查詢的詞項列表
組成了詞項列表,後面就會對每個詞項逐一執行底層查詢,將查詢結果合併,並且為每個文件生成最終的相關度評分。

Match
match 查詢的單個詞的步驟是什麼?
檢查欄位型別,檢視欄位是 analyzed, not_analyzed
分析查詢字串,如果只有一個單詞項, match 查詢在執行時就會是單個底層的 term 查詢
查詢匹配的文件,會在倒排索引中查詢匹配文件,然後獲取一組包含該項的文件
為每個文件評分
構建 Match 查詢
match 查詢可以接受 text/numeric/dates 格式的引數,分析,並構建一個查詢。

GET /_search
{
    "query": {
        "match" : {
            "message" : "this is a test"
        }
    }
}

上面的例項中 message 是一個欄位名。

對應的 QueryBuilder class : MatchQueryBuilder

具體方法 : QueryBuilders.matchQuery()

全文查詢 API 列表

Search QueryQueryBuilder ClassMethod in QueryBuilders
Match MatchQueryBuilder QueryBuilders.matchQuery()
Match Phrase MatchPhraseQueryBuilder QueryBuilders.matchPhraseQuery()
Match Phrase Prefix MatchPhrasePrefixQueryBuilder QueryBuilders.matchPhrasePrefixQuery()
Multi Match MultiMatchQueryBuilder QueryBuilders.multiMatchQuery()
Common Terms CommonTermsQueryBuilder QueryBuilders.commonTermsQuery()
Query String QueryStringQueryBuilder QueryBuilders.queryStringQuery()
Simple Query String SimpleQueryStringBuilder QueryBuilders.simpleQueryStringQuery()

基於詞項的查詢
這種型別的查詢不需要分析,它們是對單個詞項操作,只是在倒排索引中查詢準確的詞項(精確匹配)並且使用 TF/IDF 演算法為每個包含詞項的文件計算相關度評分 _score。

Term
term 查詢可用作精確值匹配,精確值的型別則可以是數字,時間,布林型別,或者是那些 not_analyzed 的字串。

對應的 QueryBuilder class 是TermQueryBuilder

具體方法是 QueryBuilders.termQuery()

Terms
terms 查詢允許指定多個值進行匹配。如果這個欄位包含了指定值中的任何一個值,就表示該文件滿足條件。

對應的 QueryBuilder class 是 TermsQueryBuilder

具體方法是 QueryBuilders.termsQuery()

Wildcard
wildcard 萬用字元查詢是一種底層基於詞的查詢,它允許指定匹配的正則表示式。而且它使用的是標準的 shell 萬用字元查詢:

? 匹配任意字元
* 匹配 0 個或多個字元
wildcard 需要掃描倒排索引中的詞列表才能找到所有匹配的詞,然後依次獲取每個詞相關的文件 ID。

由於萬用字元和正則表示式只能在查詢時才能完成,因此查詢效率會比較低,在需要高效能的場合,應當謹慎使用。

對應的 QueryBuilder class 是 WildcardQueryBuilder

具體方法是 QueryBuilders.wildcardQuery()

基於詞項 API 列表

Search QueryQueryBuilder ClassMethod in QueryBuilders
Term TermQueryBuilder QueryBuilders.termQuery()
Terms TermsQueryBuilder QueryBuilders.termsQuery()
Range RangeQueryBuilder QueryBuilders.rangeQuery()
Exists ExistsQueryBuilder QueryBuilders.existsQuery()
Prefix PrefixQueryBuilder QueryBuilders.prefixQuery()
Wildcard WildcardQueryBuilder QueryBuilders.wildcardQuery()
Regexp RegexpQueryBuilder QueryBuilders.regexpQuery()
Fuzzy FuzzyQueryBuilder QueryBuilders.fuzzyQuery()
Type TypeQueryBuilder QueryBuilders.typeQuery()
Ids IdsQueryBuilder QueryBuilders.idsQuery()

複合查詢
什麼是複合查詢?
複合查詢會將其他的複合查詢或者葉查詢包裹起來,以巢狀的形式展示和執行,得到的結果也是對各個子查詢結果和分數的合併。可以分為下面幾種:

constant_score query

經常用在使用 filter 的場合,所有匹配的文件分數都是一個不變的常量

bool query

可以將多個葉查詢和組合查詢再組合起來,可接受的引數如下

must : 文件必須匹配這些條件才能被包含進來
must_not 文件必須不匹配才能被包含進來
should 如果滿足其中的任何語句,都會增加分數;即使不滿足,也沒有影響
filter 以過濾模式進行,不評分,但是必須匹配
dis_max query

叫做分離最大化查詢,它會將任何與查詢匹配的文件都作為結果返回,但是隻是將其中最佳匹配的評分作為最終的評分返回。

function_score query

允許為每個與主查詢匹配的文件應用一個函式,可用來改變甚至替換原始的評分

boosting query

用來控制(提高或降低)複合查詢中子查詢的權重。

Search QueryQueryBuilder ClassMethod in QueryBuilders
Constant Score ConstantScoreQueryBuilder QueryBuilders.constantScoreQuery()
Bool BoolQueryBuilder QueryBuilders.boolQuery()
Dis Max DisMaxQueryBuilder QueryBuilders.disMaxQuery()
Function Score FunctionScoreQueryBuilder QueryBuilders.functionScoreQuery()
Boosting BoostingQueryBuilder QueryBuilders.boostingQuery()

特殊查詢
Wrapper Query
這裡比較重要的一個是 Wrapper Query,是說可以接受任何其他 base64 編碼的字串作為子查詢。

主要應用場合就是在 Rest High-Level REST client 中接受 json 字串作為引數。比如使用 gson 等 json 庫將要查詢的語句拼接好,直接塞到 Wrapper Query 中查詢就可以了,非常方便。

Wrapper Query 對應的 QueryBuilder class 是WrapperQueryBuilder

具體方法是 QueryBuilders.wrapperQuery()

9、關於 REST Client的完整工具類程式碼

public class IndexBean {
    //index name
    private String index;
    //index type
    private String indexType;
    //index doc id
    private String docId;
    // 1 IndexRequest 2 UpdateRequest  3 DeleteRequest
    private String operateType;

    public String getOperateType() {
        return operateType;
    }

    public void setOperateType(String operateType) {
        this.operateType = operateType;
    }

    public String getIndex() {
        return index;
    }

    public void setIndex(String index) {
        this.index = index;
    }

    public String getIndexType() {
        return indexType;
    }

    public void setIndexType(String indexType) {
        this.indexType = indexType;
    }

    public String getDocId() {
        return docId;
    }

    public void setDocId(String docId) {
        this.docId = docId;
    }
}
/**
 * 自定義的es異常類
 */
public class ElasticsearchException extends RuntimeException {
    public ElasticsearchException(String s, Exception e) {
        super(s, e);
    }
    public ElasticsearchException(String s){
        super(s);
    }
}
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * es操作
 *  
 */
public class ElasticSearchClient {
    private String[] hostsAndPorts;

    public ElasticSearchClient(String[] hostsAndPorts) {
        this.hostsAndPorts = hostsAndPorts;
    }

    public RestHighLevelClient getClient() {
        RestHighLevelClient client = null;
        List<HttpHost> httpHosts = new ArrayList<HttpHost>();
        if (hostsAndPorts.length > 0) {
            for (String hostsAndPort : hostsAndPorts) {
                String[] hp = hostsAndPort.split(":");
                httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http"));
            }
            client = new RestHighLevelClient(
                    RestClient.builder(httpHosts.toArray(new HttpHost[0])));
        } else {
            client = new RestHighLevelClient(
                    RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")));
        }
        return client;
    }

    private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) {
        IndexRequest indexRequest = null;
        if (null == index || null == indexType) {
            throw new ElasticsearchException("index or indexType must not be null");
        }
        if (null == docId) {
            indexRequest = new IndexRequest(index, indexType);
        } else {
            indexRequest = new IndexRequest(index, indexType, docId);
        }
        return indexRequest;
    }

    /**
     * 同步執行索引
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @throws IOException
     */
    public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap));
    }

    /**
     * 非同步執行
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param indexResponseActionListener
     * @throws IOException
     */
    public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException {
        getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes  返回需要包含的欄位,可以傳入空
     * @param excludes  返回需要不包含的欄位,可以傳入為空
     * @param excludes  version
     * @param excludes  versionType
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException {
        if (null == includes || includes.length == 0) {
            includes = Strings.EMPTY_ARRAY;
        }
        if (null == excludes || excludes.length == 0) {
            excludes = Strings.EMPTY_ARRAY;
        }
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        getRequest.realtime(true);
        if (null != version) {
            getRequest.version(version);
        }
        if (null != versionType) {
            getRequest.versionType(versionType);
        }
        return getClient().get(getRequest.fetchSourceContext(fetchSourceContext));
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException {
        return getRequest(index, indexType, docId, includes, excludes, null, null);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public GetResponse getRequest(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        return getClient().get(getRequest);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public Boolean existDoc(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest