ElasticSearch搜尋引擎API筆記
ElasticSearch搜尋引擎API筆記
1、 pom.xml
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.6.3</version>
</dependency>
2、 Client
1、Transport Client
(1)不設定叢集名稱
// on startup
//
此步驟新增
IP
,至少一個,如果設定了
"client.transport.sniff"= true
一個就夠了,因為添加了自動嗅探配置
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
// on shutdown
關閉
client
client.close();
(2)設定叢集名稱
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build(); //
設定
ES
例項的名稱
TransportClient client = new PreBuiltTransportClient(settings); //
自動嗅探整個叢集的狀態,把叢集中其他
ES
節點的
ip
新增到本地的客戶端列表中
//Add transport addresses and do something with the client...
(3)增加自動嗅探配置
Settings settings = Settings.builder()
.put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);
(4)其他配置
client.transport.ignore_cluster_name //
設定
true
,忽略連線節點叢集名驗證
client.transport.ping_timeout //ping
一個節點的響應時間
預設
5
秒
client.transport.nodes_sampler_interval //sample/ping
節點的時間間隔,預設是
5s
對於ES Client,有兩種形式,一個是TransportClient,一個是NodeClient。兩個的區別為:TransportClient作為一個外部訪問者,通過HTTP去請求ES的叢集,對於叢集而言,它是一個外部因素。 NodeClient顧名思義,是作為ES叢集的一個節點,它是ES中的一環,其他的節點對它是感知的,不像TransportClient那樣,ES叢集對它一無所知。NodeClient通訊的效能會更好,但是因為是ES的一環,所以它出問題,也會給ES叢集帶來問題。NodeClient可以設定不作為資料節點,在elasticsearch.yml中設定,這樣就不會在此節點上分配資料。
如果用ES的節點,大家仁者見仁智者見智,各按所需。
(5)例項
Settings esSettings = Settings.builder()
.put("cluster.name", clusterName) //
設定
ES
例項的名稱
.put("client.transport.sniff", true) //
自動嗅探整個叢集的狀態,把叢集中其他
ES
節點的
ip
新增到本地的客戶端列表中
.build();
client = new PreBuiltTransportClient(esSettings);//
初始化
client
較老版本發生了變化,此方法有幾個過載方法,初始化外掛等。
//
此步驟新增
IP
,至少一個,其實一個就夠了,因為添加了自動嗅探配置
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip), esPort));
3、 XpackTransportclient
如果 ElasticSearch 服務安裝了 x-pack 外掛,需要PreBuiltXPackTransportClient例項才能訪問
使用Maven管理專案,把下面程式碼增加到pom.xml;
一定要修改預設倉庫地址為https://artifacts.elastic.co/maven ,因為這個庫沒有上傳到Maven中央倉庫
<project ...>
<repositories>
<!-- add the elasticsearch repo -->
<repository>
<id>elasticsearch-releases</id>
<url>https://artifacts.elastic.co/maven</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
...
</repositories>
...
<dependencies>
<!-- add the x-pack jar as a dependency -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>x-pack-transport</artifactId>
<version>5.6.3</version>
</dependency>
...
</dependencies>
...
</project>
例項
Settings settings =Settings.builder().put("cluster.name", "xxx")
.put("xpack.security.transport.ssl.enabled", false)
.put("xpack.security.user", "xxx:xxx")
.put("client.transport.sniff", true).build();
try {
client = new PreBuiltXPackTransportClient(settings)
.addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName("xxx.xxx.xxx.xxx"),9300))
.addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName("xxx.xxx.xxx.xxx"),9300));
} catch (UnknownHostException e) {
e.printStackTrace();
}
4、 Document APIs
1、 Index API
Index API 允許我們儲存一個JSON格式的文件,使資料可以被搜尋。文件通過index、type、id唯一確定。我們可以自己提供一個id,或者也使用Index API 為我們自動生成一個。
這裡有幾種不同的方式來產生JSON格式的文件(document):
手動方式,使用原生的byte[]或者String
· 使用Map方式,會自動轉換成與之等價的JSON
· 使用第三方庫來序列化beans,如Jackson
· 使用內建的幫助類 XContentFactory.jsonBuilder()
2、 手動方式
資料格式
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
例項
/**
*
手動生成
JSON
*/
@Test
public void CreateJSON(){
String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
Map方式
Map是key:value資料型別,可以代表json結構.
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
例項
/**
*
使用集合
*/
@Test
public void CreateList(){
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
序列化方式
ElasticSearch已經使用了jackson,可以直接使用它把javabean轉為json.
import com.fasterxml.jackson.databind.*;
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
例項
/**
*
使用
JACKSON
序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{
CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("
這是
JAVA
書籍
");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("
程式設計
");
csdn.setDate(new Date().toString());
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(csdn);
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
XcontentBuilder幫助類方式
ElasticSearch提供了一個內建的幫助類XContentBuilder來產生JSON文件
// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
例項
/**
*
使用
ElasticSearch
幫助類
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();
IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("
建立成功
!");
}
綜合例項
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CreateIndex {
private TransportClient client;
@Before
public void getClient() throws Exception{
//
設定叢集名稱
Settings settings = Settings.builder().put("cluster.name", "my-application").build();//
叢集名
//
建立
client
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
}
/**
*
手動生成
JSON
*/
@Test
public void CreateJSON(){
String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
/**
*
使用集合
*/
@Test
public void CreateList(){
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
/**
*
使用
JACKSON
序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{
CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("
這是
JAVA
書籍
");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("
程式設計
");
csdn.setDate(new Date().toString());
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(csdn);
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
/**
*
使用
ElasticSearch
幫助類
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();
IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("
建立成功
!");
}
}
你還可以通過startArray(string)和endArray()方法新增陣列。.field()方法可以接受多種物件型別。你可以給它傳遞數字、日期、甚至其他XContentBuilder物件。
2、get API
get api可以通過id檢視文件:
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
配置執行緒
operationThreaded
設定為 true
是在不同的執行緒裡執行此次操作
下面的例子是operationThreaded
設定為 false
:
GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
3、 Delete API
根據ID刪除:
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
配置執行緒
operationThreaded
設定為 true
是在不同的執行緒裡執行此次操作
下面的例子是operationThreaded
設定為 false
:
GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
4、 Delete By Query API
通過查詢條件刪除
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //
查詢條件
.source("persons") //index(
索引名
)
.get(); //
執行
long deleted = response.getDeleted(); //
刪除文件的數量
如果需要執行的時間比較長,可以使用非同步的方式處理,結果在回撥裡面獲取
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //
查詢
.source("persons") //index(
索引名
)
.execute(new ActionListener<BulkByScrollResponse>() { //
回撥監聽
@Override
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted(); //
刪除文件的數量
}
@Override
public void onFailure(Exception e) {
// Handle the exception
}
});
5、 Update API
更新索引
有兩種方式更新索引:
建立 UpdateRequest,通過client傳送;
使用 prepareUpdate() 方法;
使用UpdateRequest
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
使用 prepareUpdate()
方法
client.prepareUpdate("ttl", "doc", "1")
.setScript(new Script("ctx._source.gender = \"male\"" ,ScriptService.ScriptType.INLINE, null, null))//
指令碼可以是本地檔案儲存的,如果使用檔案儲存的指令碼,需要設定
ScriptService.ScriptType.FILE
.get();
client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder() //
合併到現有文件
.startObject()
.field("gender", "male")
.endObject())
.get();
Update by script
使用指令碼更新文件
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
.script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();
Update by mergingdocuments
合併文件
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
Upsert
更新插入,如果存在文件就更新,如果不存在就插入
IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.upsert(indexRequest); //
如果不存在此文件
,就增加
`indexRequest`
client.update(updateRequest).get();
如果 index/type/1
存在,類似下面的文件:
{
"name" : "Joe Dalton",
"gender": "male"
}
如果不存在,會插入新的文件:
{
"name" : "Joe Smith",
"gender": "male"
}
6、 multi get Api
一次獲取多個文件
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1") //
一個
id
的方式
.add("twitter", "tweet", "2", "3", "4") //
多個
id
的方式
.add("another", "type", "foo") //
可以從另外一個索引獲取
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //
迭代返回值
GetResponse response = itemResponse.getResponse();
if (response.isExists()) { //
判斷是否存在
String json = response.getSourceAsString(); //_source
欄位
}
}
7、 Bulk API
8、 Bulk API可以批量插入:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
9、 使用Bulk Processor
BulkProcessor 提供了一個簡單的介面,在給定的大小數量上定時批量自動請求
建立BulkProcessor
例項
首先建立BulkProcessor
例項
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client, //
增加
elasticsearch
客戶端
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } //
呼叫
bulk
之前執行
,例如你可以通過
request.numberOfActions()
方法知道
numberOfActions
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } //
呼叫
bulk
之後執行
,例如你可以通過
request.hasFailures()
方法知道是否執行失敗
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } //
呼叫失敗拋
Throwable
})
.setBulkActions(10000) //
每次
10000
請求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //
拆成
5mb
一塊
.setFlushInterval(TimeValue.timeValueSeconds(5)) //
無論請求數量多少,每
5
秒鐘請求一次。
.setConcurrentRequests(1) //
設定併發請求的數量。值為
0
意味著只允許執行一個請求。值為
1
意味著允許
1
併發請求。
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//
設定自定義重複請求機制,最開始等待
100
毫秒,之後成倍更加,重試
3
次,當一次或多次重複請求失敗後因為計算資源不夠丟擲
EsRejectedExecutionException
異常,可以通過
BackoffPolicy.noBackoff()
方法關閉重試機制
.build();
BulkProcessor 預設設定
·