Elasticsearch Java Rest Client API 整理總結 (一)——Document API
目錄
引言
業餘時間搞 python 爬蟲爬取資料,完善我的小程式;工作時間還是要努力完成領導分配的任務,做我的 Java 老本行的。
這不,現在就有個需求,集團要將 elasticsearch 版本從 2.2 升級到 6.3, 由於之前做專案使用 spring data es
來完成 es 資料的增刪改查,現在一下升級到這麼高的版本,遇到各種 API 不相容的問題。並且 spring data es
由於整體框架 spring
等版本的限制,也不能使用了。
無奈之下,只能使用 elasticsearch 提供的 java reset client API 來完成之前的操作。工欲善其事,必先利其器
注意,本 API 指南只針對 elasticsearch 6.3 版本。
概述
Rest client 分成兩部分:
- Java Low Level REST Client
- 官方低級別 es 客戶端,使用 http 協議與 Elastiicsearch 叢集通訊,與所有 es 版本相容。
- Java High level REST Client
- 官方高級別 es 客戶端,基於低級別的客戶端,它會暴露 API 特定的方法。
High REST Client
High Client 基於 Low Client, 主要目的是暴露一些 API,這些 API 可以接受請求物件為引數,返回響應物件,而對請求和響應細節的處理都是由 client 自動完成的。
每個 API 在呼叫時都可以是同步或者非同步的。同步和非同步 API 的區別是什麼呢?
- 同步 API 會導致阻塞,一直等待資料返回
- 非同步 API 在命名上會加上
async
字尾,需要有一個listener
作為引數,等這個請求返回結果或者發生錯誤時,這個listener
就會被呼叫
起步
相容性
- java 1.8
- Elasticsearch 核心專案
Java Doc 地址
只有英文版
Maven 配置
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.3.2</version>
</dependency>
依賴
org.elasticsearch.client:elasticsearch-rest-client
org.elasticsearch:elasticsearch
初始化
RestHighLevelClient
例項依賴 REST low-level client builder
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
High-level client 會依賴 Low-level client 來執行請求, low-level client 則會維護一個請求的執行緒連線池,因為當 high-level 請求處理結束時,應該 close 掉這個連線,使 low-level client 能儘快釋放資源。
client.close();
文件 API
High level rest 客戶端支援下面的 文件(Document) API
- 單文件 API
- index API
- Get API
- Delete API
- Update API
- 多文件 API
- Bulk API
- Multi-Get API
Index API
IndexRequest
IndexRequest request = new IndexRequest(
"posts", // 索引 Index
"doc", // Type
"1"); // 文件 Document Id
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON); // 文件源格式為 json string
Document Source
document source 可以是下面的格式
- Map
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
.source(jsonMap); // 會自動將 Map 轉換為 JSON 格式
- XContentBuilder : 這是 Document Source 提供的幫助類,專門用來產生 json 格式的資料
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
.source(builder);
- Object 鍵對
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
同步索引
IndexResponse indexResponse = client.index(request);
非同步索引
前面已經講過,非同步執行函式需要新增 listener
, 而對於 index 而言,這個 listener
的型別就是 ActionListener
client.indexAsync(request, listener);
非同步方法執行後會立刻返回,在索引操作執行完成後,ActionListener
就會被回撥:
- 執行成功,呼叫
onResponse
函式 - 執行失敗,呼叫
onFailure
函式
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
IndexResponse
不管是同步回撥還是非同步回撥,如果呼叫成功,都會返回 IndexRespose
物件。 這個物件中包含什麼資訊呢?看下面程式碼
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 文件第一次建立
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 文件之前已存在,當前是重寫
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// 成功的分片數量少於總分片數量
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason(); // 處理潛在的失敗資訊
}
}
在索引時有版本衝突的話,會丟擲 ElasticsearchException
IndexRequest request = new IndexRequest("posts", "doc", "1")
.source("field", "value")
.version(1); // 這裡是文件版本號
try {
IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
// 衝突了
}
}
如果將 opType
設定為 create
, 而且如果索引的文件與已存在的文件在 index, type 和 id 上均相同,也會丟擲衝突異常。
IndexRequest request = new IndexRequest("posts", "doc", "1")
.source("field", "value")
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
}
GET API
GET 請求
每個 GET 請求都必須需傳入下面 3 個引數
- Index
- Type
- Document id
GetRequest getRequest = new GetRequest(
"posts",
"doc",
"1");
可選引數
下面的引數都是可選的, 裡面的選項並不完整,如要獲取完整的屬性,請參考 官方文件
- 不獲取源資料,預設是獲取的
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
- 配置返回資料中包含指定欄位
String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
- 配置返回資料中排除指定欄位
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
- 實時 預設為 true
request.realtime(false);
- 版本
request.version(2);
- 版本型別
request.versionType(VersionType.EXTERNAL);
同步執行
GetResponse getResponse = client.get(getRequest);
非同步執行
此部分與 index 相似, 只有一點不同, 返回型別為 GetResponse
程式碼部分略
Get Response
返回的 GetResponse
物件包含要請求的文件資料(包含元資料和欄位)
String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) {
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString(); // string 形式
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // map
byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 位元組形式
} else {
// 沒有發現請求的文件
}
在請求中如果包含特定的文件版本,如果與已存在的文件版本不匹配, 就會出現衝突
try {
GetRequest request = new GetRequest("posts", "doc", "1").version(2);
GetResponse getResponse = client.get(request);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
// 版本衝突
}
}
Exists API
如果文件存在 Exists API 返回 true
, 否則返回 fasle
。
Exists Request
GetRequest
用法和 Get API 差不多,兩個物件的可選引數是相同的。由於 exists()
方法只返回 true
或者 false
, 建議將獲取 _source
以及任何儲存欄位的值關閉,儘量使請求輕量級。
GetRequest getRequest = new GetRequest(
"posts", // Index
"doc", // Type
"1"); // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 欄位
getRequest.storedFields("_none_"); // 禁止儲存任何欄位
同步請求
boolean exists = client.exists(getRequest);
非同步請求
非同步請求與 Index API 相似,此處不贅述,只貼上程式碼。如需詳細瞭解,請參閱官方地址
ActionListener<Boolean> listener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean exists) {
}
@Override
public void onFailure(Exception e) {
}
};
client.existsAsync(getRequest, listener);
Delete API
Delete Request
DeleteRequest
必須傳入下面引數
DeleteRequest request = new DeleteRequest(
"posts", // index
"doc", // doc
"1"); // document id
可選引數
超時時間
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
重新整理策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
版本
request.version(2);
版本型別
request.versionType(VersionType.EXTERNAL);
同步執行
DeleteResponse deleteResponse = client.delete(request);
非同步執行
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.deleteAsync(request, listener);
Delete Response
DeleteResponse
可以檢索執行操作的資訊,如程式碼所示
String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// 成功分片數目小於總分片
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason(); // 處理潛在失敗
}
}
也可以來檢查文件是否存在
DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(request);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
// 文件不存在
}
版本衝突時也會丟擲 `ElasticsearchException
try {
DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
DeleteResponse deleteResponse = client.delete(request);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
// 版本衝突
}
}
Update API
Update Request
UpdateRequest
的必需引數如下
UpdateRequest request = new UpdateRequest(
"posts", // Index
"doc", // 型別
"1"); // 文件 Id
使用指令碼更新
部分文件更新
在更新部分文件時,已存在文件與部分文件會合並。
部分文件可以有以下形式
JSON 格式
UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
Map
格式
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc(jsonMap);
XContentBuilder
物件
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.timeField("updated", new Date());
builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc(builder);
Object
key-pairs
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc("updated", new Date(),
"reason", "daily update");
Upserts
如果文件不存在,可以使用 upserts
方法將文件以新文件的方式建立。
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc("updated", new Date(),
"reason", "daily update");
upserts
方法支援的文件格式與 update
方法相同。
可選引數
超時時間
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
重新整理策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
衝突後重試次數
request.retryOnConflict(3);
獲取資料來源,預設是開啟的
request.fetchSource(true);
包括特定欄位
String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(new FetchSourceContext(true, includes, excludes));
排除特定欄位
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(new FetchSourceContext(true, includes, excludes));
指定版本
request.version(2);
禁用 noop detection
request.scriptedUpsert(true);
設定如果更新的文件不存在,就必須要建立一個
request.docAsUpsert(true);
同步執行
UpdateResponse updateResponse = client.update(request);
非同步執行
此處只貼程式碼,官方地址
ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.updateAsync(request, listener);
Update Response
String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
// 文件已建立
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// 文件已更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
// 文件已刪除
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
// 文件不受更新的影響
}
如果在 UpdateRequest
中使能了獲取源資料,響應中則包含了更新後的源文件資訊。
GetResult result = updateResponse.getGetResult();
if (result.isExists()) {
String sourceAsString = result.sourceAsString(); // 將獲取的文件以 string 格式輸出
Map<String, Object> sourceAsMap = result.sourceAsMap(); // 以 Map 格式輸出
byte[] sourceAsBytes = result.source(); // 位元組形式
} else {
// 預設情況下,不會返回文件源資料
}
也可以檢測是否分片失敗
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// 成功的分片數量小於總分片數量
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason(); // 得到分片失敗的原因
}
}
如果在執行 UpdateRequest
時,文件不存在,響應中會包含 404
狀態碼,而且會丟擲 ElasticsearchException
。
UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist")
.doc("field", "value");
try {
UpdateResponse updateResponse = client.update(request);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
// 處理文件不存在的情況
}
}
如果版本衝突,也會丟擲 ElasticsearchException
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc("field", "value")
.version(1);
try {
UpdateResponse updateResponse = client.update(request);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
// 處理版本衝突的情況
}
}
Bulk API 批量處理
批量請求
使用 BulkRequest
可以在一次請求中執行多個索引,更新和刪除的操作。
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("posts", "doc", "1")
.source(XContentType.JSON,"field", "foo")); // 將第一個 IndexRequest 新增到批量請求中
request.add(new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON,"field", "bar")); // 第二個
request.add(new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON,"field", "baz")); // 第三個
在同一個 BulkRequest
也可以新增不同的操作型別
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3"));
request.add(new UpdateRequest("posts", "doc", "2")
.doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")
.source(XContentType.JSON,"field", "baz"));
可選引數
超時時間
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
重新整理策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
設定在批量操作前必須有幾個分片處於啟用狀態
request.waitForActiveShards(2);
request.waitForActiveShards(ActiveShardCount.ALL); // 全部分片都處於啟用狀態
request.waitForActiveShards(ActiveShardCount.DEFAULT); // 預設
request.waitForActiveShards(ActiveShardCount.ONE); // 一個
同步請求
BulkResponse bulkResponse = client.bulk(request);
非同步請求
與 GETAPI 等請求類似,只貼程式碼。
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.bulkAsync(request, listener);
Bulk Response
BulkResponse
中包含執行操作後的資訊,並允許對每個操作結果迭代。
for (BulkItemResponse bulkItemResponse : bulkResponse) { // 遍歷所有的操作結果
DocWriteResponse itemResponse = bulkItemResponse.getResponse(); // 獲取操作結果的響應,可以是 IndexResponse, UpdateResponse or DeleteResponse, 它們都可以慚怍是 DocWriteResponse 例項
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse; // index 操作後的響應結果
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse; // update 操作後的響應結果
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse; // delete 操作後的響應結果
}
}
此外,批量響應還有一個非常便捷的方法來檢測是否有一個或多個操作失敗
if (bulkResponse.hasFailures()) {
// 表示至少有一個操作失敗
}
在這種情況下,我們要遍歷所有的操作結果,檢查是否是失敗的操作,並獲取對應的失敗資訊
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) { // 檢測給定的操作是否失敗
BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); // 獲取失敗資訊
}
}
Bulk Processor
BulkProcessor
是為了簡化 Bulk API 的操作提供的一個工具類,要執行操作,就需要下面元件
RestHighLevelClient
用來執行BulkRequest
並獲取 BulkResponse`BulkProcessor.Listener
對BulkRequest
執行前後以及失敗時監聽
BulkProcessor.builder
方法用來構建一個新的BulkProcessor
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// 在每個 BulkRequest 執行前呼叫
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
// 在每個 BulkRequest 執行後呼叫
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// 失敗時呼叫
}
};
BulkProcessor bulkProcessor =
BulkProcessor.builder(client::bulkAsync, listener).build(); // 構建 BulkProcessor, RestHighLevelClient.bulkAsync() 用來執行 BulkRequest
BulkProcessor.Builder
提供了多個方法來配置 BulkProcessor
如何來處理請求的執行。
BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
builder.setBulkActions(500); // 指定多少操作時,就會重新整理一次
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
builder.setConcurrentRequests(0); // 指定多大容量,就會重新整理一次
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // 允許併發執行的數量
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
BulkProcessor
建立後,各種請求就可以新增進去:
IndexRequest one = new IndexRequest("posts", "doc", "1").
source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);
BulkProcessor
執行時,會對每個 bulk request呼叫 BulkProcessor.Listener
, listener 提供了下面方法來訪問 BulkRequest
和 BulkResponse
:
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions(); // 在執行前獲取操作的數量
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) { // 執行後檢視響應中是否包含失敗的操作
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure); // 請求失敗時列印資訊
}
};
請求新增到 BulkProcessor
, 它的例項可以使用下面兩種方法關閉請求。
awaitClose()
在請求返回後或等待一定時間關閉
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
close()
立刻關閉
bulkProcessor.close();
兩個方法都會在關閉前對處理器中的請求進行重新整理,並避免新的請求新增進去。
Multi-Get API
multiGet
API 可以在單個 http 互動中並行的執行多個 get
請求。
Muti-Get Request
MutiGetRequest
例項化時引數為空,例項化後可以通過新增 MultiGetRequest.Item
來配置獲取的資訊
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item(
"index", // 索引
"type", // 型別
"example_id")); // 文件 id
request.add(new MultiGetRequest.Item("index", "type", "another_id")); // 新增另外一個條目
可選引數
multiGet
支援的引數與 Get API 支援的可選引數是相同的,可以在 Item 上設定它們。
同步執行
構建 MultiGetRequest
後可以同步的方式執行multiGet
MultiGetResponse response = client.multiGet(request);
非同步執行
和上面的非同步執行一樣,也是使用 listener 機制。
ActionListener<MultiGetResponse> listener = new ActionListener<MultiGetResponse>() {
@Override
public void onResponse(MultiGetResponse response) {
}
@Override
public void onFailure(Exception e) {
}
};
client.multiGetAsync(request, listener);
Multi-Get Response
MultiGetResponse
中getResponse
方法包含的 MultiGetItemResponse
順序與請求時的相同。
MultiGetItemResponse
,如果執行成功,就會返回 GetResponse
物件,失敗則返回
MultiGetResponse.Failure
MultiGetItemResponse firstItem = response.getResponses()[0];
assertNull(firstItem.getFailure()); // 執行成功,則返回 null
GetResponse firstGet = firstItem.getResponse(); // 返回 GetResponse 物件
String index = firstItem.getIndex();
String type = firstItem.getType();
String id = firstItem.getId();
if (firstGet.isExists()) {
long version = firstGet.getVersion();
String sourceAsString = firstGet.getSourceAsString(); // string 格式
Map<String, Object> sourceAsMap = firstGet.getSourceAsMap(); // Map
byte[] sourceAsBytes = firstGet.getSourceAsBytes(); // bytes
} else {
// 沒有發現文件
// 儘管響應中會返回 404 狀態碼,也會返回一個有效的 GetResponse
// 這是可以使用 isExists 方法來判斷
}
如果子請求中對應的 index 不存在,返回的 getFailure
方法中會包含 exception:
assertNull(missingIndexItem.getResponse()); // 獲取的響應為空
Exception e = missingIndexItem.getFailure().getFailure(); // 獲取 exception
ElasticsearchException ee = (ElasticsearchException) e;
// TODO status is broken! fix in a followup
// assertEquals(RestStatus.NOT_FOUND, ee.status());
assertThat(e.getMessage(),
containsString("reason=no such index"));
對版本衝突時的處理,官方說明地址
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("index", "type", "example_id")
.version(1000L));
MultiGetResponse response = client.multiGet(request);
MultiGetItemResponse item = response.getResponses()[0];
assertNull(item.getResponse());
Exception e = item.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;
// TODO status is broken! fix in a followup
// assertEquals(RestStatus.CONFLICT, ee.status());
assertThat(e.getMessage(),
containsString("version conflict, current version [1] is "
+ "different than the one provided [1000]"));
結語
本文只包含 Java High level Rest Client 的 起步,和文件 API 部分,下篇文章中會包含查詢 API,敬請期待。