elasticsearch(五)java 使用批量操作bulk及注意事項
1,BulkRequest物件可以用來在一次請求中,執行多個索引、更新或刪除操作
且允許在一次請求中進行不同的操作,即一次請求中索引、更新、刪除操作可以同時存在
BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(new DeleteRequest("posts", "doc", "300")); bulkRequest.add(new UpdateRequest("posts", "doc", "2").doc(XContentType.JSON,"other", "test").fetchSource(true)); bulkRequest.add(new IndexRequest("posts", "doc", "4").source(XContentType.JSON,"field", "baz"));
2,關於BulkRequest的引數設定,除了使用BulkRequest add(IndexRequest request)等方法加入針對單個不同的文件操作請求外,其它通用引數設定同單個文件操作設定:
bulkRequest.timeout(TimeValue.timeValueMinutes(2));
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
注意,針對單個文件操作的設定,應該在add方法裡面設定,如為某個更新操作進行返回結果的設定【.fetchSource(true)】:
bulkRequest.add(new UpdateRequest("posts", "doc", "2").doc(XContentType.JSON,"other", "test").fetchSource(true));
3,BulkResponse 作為執行結果的接收物件,它包含執行操作的資訊,且可以使用它來遍歷每個操作的執行結果
for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
注意的是:bulkItemResponse.getOpType() 返回的是請求問題的add方法加入的操作,而不是實際對文件進行操作的值,如新增到請求中的操作為
bulkRequest.add(new IndexRequest("posts", "doc2", "1").source(XContentType.JSON,"field", "foo"));
要是文件不存在,會自動建立一個,此時如下程式碼是執行的,也就是判斷是建立成功是正確的
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
System.out.println("id=" + indexResponse.getId() + "的文件建立成功");
System.out.println("id=" + indexResponse.getId() + "文件操作型別:" + indexResponse.getResult());
}
但是要是文件存在,原來的文件會被更新(而非建立),如上程式碼依然執行,而如下判斷
bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE
返回的卻是false,所以要是想知道文件實際被進行的操作,可以通過如下程式碼進行:
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
IndexResponse indexResponse = (IndexResponse) itemResponse;
indexResponse.getResult()
其中itemResponse.getResult()和indexResponse.getResult()都可以獲取實際的操作行為
4,如果elasticsearch伺服器中不存在對應的值為1的文件id,會自動建立一個id為1的文件
同樣,如果不存在posts文件庫的話,也會根index/type/id據自動建立整個文件
bulkRequest.add(new IndexRequest("posts", "doc", "1").source(XContentType.JSON,"field", "foo"));
但是類似如下,如果posts文件庫中如果已存在型別為doc的文件,則會報錯
bulkRequest.add(new IndexRequest("posts", "doc2", "1").source(XContentType.JSON,"field", "foo"));
報錯內容如下:
Rejecting mapping update to [posts] as the final mapping would have more than 1 type: [doc2, doc]
原因:在ElasticSearch6.0以後一個index下只能有一個type值,所以無法在posts下自動再建立一個新的型別的文件
程式碼中可以通過如下判斷是否出現這種非法的執行操作:
if (bulkItemResponse.getFailure() != null) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
if(failure.getStatus() == RestStatus.BAD_REQUEST) {
System.out.println("id=" + bulkItemResponse.getId() + "為非法的請求!");
continue;
}
}
對於IndexRequest請求操作,如果希望建立文件,而文件要是存在時不要進行更新的話,可以進行如下設定:
bulkRequest.add(new IndexRequest("posts", "doc", "5").source(XContentType.JSON,"field", "foo").opType(DocWriteRequest.OpType.CREATE));
即新增.opType(DocWriteRequest.OpType.CREATE)設定,同時failure.getStatus() == RestStatus.CONFLICT設定不丟擲異常
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
if(bulkItemResponse.getFailure() != null && bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT) {
System.out.println("id=" + bulkItemResponse.getId() + "與現在文件衝突");
continue;
}
IndexResponse indexResponse = (IndexResponse) itemResponse;
System.out.println("id=" + indexResponse.getId() + "的文件建立成功");
System.out.println("id=" + indexResponse.getId() + "文件操作型別:" + itemResponse.getResult());
}
5,對於刪除操作,如果不作特別的判斷,如下的話,會一直都是會進入if方法執行的(即使文件不存在)
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
所以如果要想判斷文件不存在的情況,則需要如下判斷:
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
System.out.println("id=" + deleteResponse.getId() + "的文件未找到,未執行刪除!");
}else {
System.out.println("id=" + deleteResponse.getId() + "的文件刪除成功");
}
}
6,完整程式碼示例:
package com.example.elasticsearch.document;
import org.apache.http.HttpHost;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
/**
* Created with IntelliJ IDEA.
*
* @Author: Weichang Zhong
* @Date: 2018/11/7
* @Time: 16:26
* @Description:
*/
public class SynBulkRequest {
public static void main(String[] args) {
try (RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http")
)
)) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("posts", "doc", "5").source(XContentType.JSON,"field", "foo").opType(DocWriteRequest.OpType.CREATE));
bulkRequest.add(new IndexRequest("posts2000", "doc", "2").source(XContentType.JSON,"field", "bar"));
bulkRequest.add(new IndexRequest("posts", "doc", "3").source(XContentType.JSON,"field", "baz"));
bulkRequest.add(new DeleteRequest("posts", "doc", "300"));
bulkRequest.add(new UpdateRequest("posts", "doc", "2").doc(XContentType.JSON,"other", "test").fetchSource(true));
bulkRequest.add(new IndexRequest("posts", "doc", "4").source(XContentType.JSON,"field", "baz"));
bulkRequest.timeout(TimeValue.timeValueMinutes(2));
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.getFailure() != null) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
System.out.println(failure.getCause());
if(failure.getStatus() == RestStatus.BAD_REQUEST) {
System.out.println("id=" + bulkItemResponse.getId() + "為非法的請求!");
continue;
}
}
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
if(bulkItemResponse.getFailure() != null && bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT) {
System.out.println("id=" + bulkItemResponse.getId() + "與現在文件衝突");
continue;
}
IndexResponse indexResponse = (IndexResponse) itemResponse;
System.out.println("id=" + indexResponse.getId() + "的文件建立成功");
System.out.println("id=" + indexResponse.getId() + "文件操作型別:" + itemResponse.getResult());
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
System.out.println("id=" + updateResponse.getId() + "的文件更新成功");
System.out.println("id=" + updateResponse.getId() +"文件內容為:" + updateResponse.getGetResult().sourceAsString());
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
System.out.println("id=" + deleteResponse.getId() + "的文件未找到,未執行刪除!");
}else {
System.out.println("id=" + deleteResponse.getId() + "的文件刪除成功");
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
注意:bulk批量操作裡是不允許執行get操作的,因為get操作和其它操作的引數是不同的,所以如下程式碼會報錯:
bulkRequest.add(new GetRequest("posts", "doc", "22"));