elasticsearch(一)java 分別使用同步和非同步方法進行索引、更新操作
一、索引或更新基本步驟
1) 建立
與elasticsearch服務進行連線的RestHighLevelClient物件
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http")
)
);
2)將文件內容以一個XContentBuilder 物件的方式進行建立,elasticsearch內容助手會根據該物件自動生成json格式內容進行儲存
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user", "kimchy"); builder.timeField("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject();
3) 建立IndexRequest
索引請求物件,並將XContentBuilder 作為引數傳入其source方法
IndexRequest indexRequest = new IndexRequest("posts", "doc", "16")
.source(builder);
posts為索引庫,doc為型別,1為指定的文件id
4)其它引數設定
indexRequest.timeout(TimeValue.timeValueSeconds(5)); indexRequest.opType(DocWriteRequest.OpType.INDEX);
a,如果操作設定為DocWriteRequest.OpType.INDEX(預設值),如果文件存在,則更新文件;如果文件不存在,則建立文件
b,如果操作設定為DocWriteRequest.OpType.CREATE,則是指定為建立文件操作,如果物件的文件(根據id判斷)存在,則報錯如下:
ElasticsearchStatusException[Elasticsearch exception
[type=version_conflict_engine_exception,
reason=[doc][16]: version conflict, document already exists
索引操作只能為以上兩種操作值,不能為UPDATE和DELETE
二、進行同步請求:使用client.index(indexRequest, RequestOptions.DEFAULT);方法
try {
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("新增成功");
System.out.println("type:" + type);
System.out.println("id:" + id);
System.out.println("version:" + version);
System.out.println("index:" + index);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("更新成功");
System.out.println("index:" + index);
System.out.println("type:" + type);
System.out.println("id:" + id);
System.out.println("version:" + version);
}
}catch (ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
System.out.println("建立的文件與已存在的發生衝突");
}
}
三、非同步請求:
1)建立非同步請求的,回撥物件:ActionListener<IndexResponse>
如果執行成功,會自動呼叫onResponse方法,如果執行失敗,會回撥onFailure方法
可以從傳入的IndexResponse和Exception型別引數中獲取相關建立情況資訊
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("新增成功");
System.out.println("type:" + type);
System.out.println("id:" + id);
System.out.println("version:" + version);
System.out.println("index:" + index);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("更新成功");
System.out.println("index:" + index);
System.out.println("type:" + type);
System.out.println("id:" + id);
System.out.println("version:" + version);
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
}
}
}
@Override
public void onFailure(Exception e) {
e.printStackTrace();
}
};
2)進行非同步請求:將請求物件、和回撥物件作為引數傳入
client.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);
三、完整示例程式碼如下:
1)同步方法程式碼示例:
package com.example.elasticsearch.main;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import java.util.Date;
/**
* Created with IntelliJ IDEA.
*
* @Author: Weichang Zhong
* @Date: 2018/11/6
* @Time: 15:16
* @Description:
*/
public class Test {
public static void main(String[] args) throws Exception{
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http")
)
);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "20")
.source(builder);
indexRequest.timeout(TimeValue.timeValueSeconds(5));
indexRequest.opType(DocWriteRequest.OpType.INDEX);
try {
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("新增成功");
System.out.println("type:" + type);
System.out.println("id:" + id);
System.out.println("version:" + version);
System.out.println("index:" + index);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("更新成功");
System.out.println("index:" + index);
System.out.println("type:" + type);
System.out.println("id:" + id);
System.out.println("version:" + version);
}
}catch (ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
System.out.println("建立的文件與已存在的發生衝突");
}
}
client.close();
}
}
2)非同步方法程式碼示例
package com.example.elasticsearch.main;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import java.util.Date;
/**
* Created with IntelliJ IDEA.
*
* @Author: Weichang Zhong
* @Date: 2018/11/6
* @Time: 15:16
* @Description:
*/
public class Test {
public static void main(String[] args) throws Exception{
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http")
)
);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "16")
.source(builder);
indexRequest.timeout(TimeValue.timeValueSeconds(5));
indexRequest.opType(DocWriteRequest.OpType.CREATE);
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("新增成功");
System.out.println("type:" + type);
System.out.println("id:" + id);
System.out.println("version:" + version);
System.out.println("index:" + index);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("更新成功");
System.out.println("index:" + index);
System.out.println("type:" + type);
System.out.println("id:" + id);
System.out.println("version:" + version);
}
}
@Override
public void onFailure(Exception e) {
ElasticsearchException elasticsearchException = (ElasticsearchException)e;
if (elasticsearchException.status() == RestStatus.CONFLICT) {
System.out.println("建立的文件已存在");
}
}
};
client.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);
// client.close();
}
}
注意:
1,// client.close(); 如果不被註釋掉,可能還沒有將請求傳送出去,連線就會被關閉,從而建立或更新失敗。
所以如上程式碼的非同步請求中,將此行註釋掉以進行實驗
2,IndexRequest indexRequest = new IndexRequest("posts", "doc", null)
建立請求物件的id設定為null,則每次執行後都會自動生成一個新的id
英文連結: