1. 程式人生 > >ElasticSearch搜尋引擎API筆記

ElasticSearch搜尋引擎API筆記

                                                                  ElasticSearch搜尋引擎API筆記

1、 pom.xml

    <dependency>

       <groupId>org.elasticsearch.client</groupId>

        <artifactId>transport</artifactId>

        <version>5.6.3</version>

     </dependency>

2、 Client

1、Transport Client

(1)不設定叢集名稱

// on startup
 
//此步驟新增IP,至少一個,如果設定了"client.transport.sniff"= true 一個就夠了,因為添加了自動嗅探配置
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
 
// on shutdown  關閉client
 
client.close();

(2)設定叢集名稱

Settings settings = Settings.builder()
        .put("cluster.name", "myClusterName").build();  //設定ES例項的名稱
TransportClient client = new PreBuiltTransportClient(settings);  //自動嗅探整個叢集的狀態,把叢集中其他ES節點的ip新增到本地的客戶端列表中
//Add transport addresses and do something with the client...

(3)增加自動嗅探配置

Settings settings = Settings.builder()
        .put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);

(4)其他配置

client.transport.ignore_cluster_name  //設定 true ,忽略連線節點叢集名驗證
client.transport.ping_timeout       //ping一個節點的響應時間預設5
client.transport.nodes_sampler_interval //sample/ping 節點的時間間隔,預設是5s

對於ES Client,有兩種形式,一個是TransportClient,一個是NodeClient。兩個的區別為:TransportClient作為一個外部訪問者,通過HTTP去請求ES的叢集,對於叢集而言,它是一個外部因素。 NodeClient顧名思義,是作為ES叢集的一個節點,它是ES中的一環,其他的節點對它是感知的,不像TransportClient那樣,ES叢集對它一無所知。NodeClient通訊的效能會更好,但是因為是ES的一環,所以它出問題,也會給ES叢集帶來問題。NodeClient可以設定不作為資料節點,在elasticsearch.yml中設定,這樣就不會在此節點上分配資料。

如果用ES的節點,大家仁者見仁智者見智,各按所需。

(5)例項

Settings esSettings = Settings.builder()
 
    .put("cluster.name", clusterName) //設定ES例項的名稱
 
    .put("client.transport.sniff", true) //自動嗅探整個叢集的狀態,把叢集中其他ES節點的ip新增到本地的客戶端列表中
 
    .build();
 
    client = new PreBuiltTransportClient(esSettings);//初始化client較老版本發生了變化,此方法有幾個過載方法,初始化外掛等。
 
    //此步驟新增IP,至少一個,其實一個就夠了,因為添加了自動嗅探配置
 
    client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip), esPort));

3、 XpackTransportclient

如果 ElasticSearch 服務安裝了 x-pack 外掛,需要PreBuiltXPackTransportClient例項才能訪問

使用Maven管理專案,把下面程式碼增加到pom.xml;

一定要修改預設倉庫地址為https://artifacts.elastic.co/maven ,因為這個庫沒有上傳到Maven中央倉庫

<project ...>

   <repositories>

     <!-- add the elasticsearch repo -->

     <repository>

        <id>elasticsearch-releases</id>

        <url>https://artifacts.elastic.co/maven</url>

        <releases>

           <enabled>true</enabled>

        </releases>

        <snapshots>

           <enabled>false</enabled>

        </snapshots>

     </repository>

     ...

  </repositories>

   ...

  <dependencies>

     <!-- add the x-pack jar as a dependency -->

     <dependency>

        <groupId>org.elasticsearch.client</groupId>

        <artifactId>x-pack-transport</artifactId>

        <version>5.6.3</version>

     </dependency>

     ...

  </dependencies>

   ...

 </project>

例項

Settings settings =Settings.builder().put("cluster.name", "xxx")

                   .put("xpack.security.transport.ssl.enabled", false)

                   .put("xpack.security.user", "xxx:xxx")

                   .put("client.transport.sniff", true).build();

try {

   client = new PreBuiltXPackTransportClient(settings)

           .addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName("xxx.xxx.xxx.xxx"),9300))

           .addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName("xxx.xxx.xxx.xxx"),9300));

} catch (UnknownHostException e) {

   e.printStackTrace();

}

4、 Document APIs

1、 Index API

 Index API 允許我們儲存一個JSON格式的文件,使資料可以被搜尋。文件通過indextypeid唯一確定。我們可以自己提供一個id,或者也使用Index API 為我們自動生成一個。

這裡有幾種不同的方式來產生JSON格式的文件(document)

手動方式,使用原生的byte[]或者String

·       使用Map方式,會自動轉換成與之等價的JSON

·       使用第三方庫來序列化beans,如Jackson

·       使用內建的幫助類 XContentFactory.jsonBuilder()

2、 手動方式

   資料格式

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

例項

/**  
 * 手動生成JSON  
 */  
@Test  
public void CreateJSON(){  
 
    String json = "{" +  
            "\"user\":\"fendo\"," +  
            "\"postDate\":\"2013-01-30\"," +  
            "\"message\":\"Hell word\"" +  
        "}";  
 
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
 
}

Map方式

Mapkey:value資料型別,可以代表json結構.

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");

例項

/**  
 * 使用集合  
 */  
@Test  
public void CreateList(){  
 
    Map<String, Object> json = new HashMap<String, Object>();  
    json.put("user","kimchy");  
    json.put("postDate","2013-01-30");  
    json.put("message","trying out Elasticsearch");  
 
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
 
}
序列化方式
ElasticSearch已經使用了jackson,可以直接使用它把javabean轉為json.
import com.fasterxml.jackson.databind.*;
 
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
 
// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
例項
/**  
 * 使用JACKSON序列化  
 * @throws Exception  
 */  
@Test  
public void CreateJACKSON() throws Exception{  
 
    CsdnBlog csdn=new CsdnBlog();  
    csdn.setAuthor("fendo");  
    csdn.setContent("這是JAVA書籍");  
    csdn.setTag("C");  
    csdn.setView("100");  
    csdn.setTitile("程式設計");  
    csdn.setDate(new Date().toString());  
 
    // instance a json mapper  
    ObjectMapper mapper = new ObjectMapper(); // create once, reuse  
 
    // generate json  
    byte[] json = mapper.writeValueAsBytes(csdn);  
 
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
}
XcontentBuilder幫助類方式
ElasticSearch提供了一個內建的幫助類XContentBuilder來產生JSON文件
// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
例項
/**  
 * 使用ElasticSearch 幫助類  
 * @throws IOException   
 */  
@Test  
public void CreateXContentBuilder() throws IOException{  
 
    XContentBuilder builder = XContentFactory.jsonBuilder()  
            .startObject()  
                .field("user", "ccse")  
                .field("postDate", new Date())  
                .field("message", "this is Elasticsearch")  
            .endObject();  
 
    IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
    System.out.println("建立成功!");  
 
 
}
綜合例項
import java.io.IOException;  
import java.net.InetAddress;  
import java.net.UnknownHostException;  
import java.util.Date;  
import java.util.HashMap;  
import java.util.Map;  
 
import org.elasticsearch.action.index.IndexResponse;  
import org.elasticsearch.client.transport.TransportClient;  
import org.elasticsearch.common.settings.Settings;  
import org.elasticsearch.common.transport.InetSocketTransportAddress;  
import org.elasticsearch.common.xcontent.XContentBuilder;  
import org.elasticsearch.common.xcontent.XContentFactory;  
import org.elasticsearch.transport.client.PreBuiltTransportClient;  
import org.junit.Before;  
import org.junit.Test;  
 
import com.fasterxml.jackson.core.JsonProcessingException;  
import com.fasterxml.jackson.databind.ObjectMapper;  
 
public class CreateIndex {  
 
    private TransportClient client;  
 
    @Before  
    public void getClient() throws Exception{  
        //設定叢集名稱  
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 叢集名  
        //建立client  
        client  = new PreBuiltTransportClient(settings)  
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));  
    
 
    /**  
     * 手動生成JSON  
     */  
    @Test  
    public void CreateJSON(){  
 
        String json = "{" +  
                "\"user\":\"fendo\"," +  
                "\"postDate\":\"2013-01-30\"," +  
                "\"message\":\"Hell word\"" +  
            "}";  
 
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
 
    
 
 
    /**  
     * 使用集合  
     */  
    @Test  
    public void CreateList(){  
 
        Map<String, Object> json = new HashMap<String, Object>();  
        json.put("user","kimchy");  
        json.put("postDate","2013-01-30");  
        json.put("message","trying out Elasticsearch");  
 
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
 
    
 
    /**  
     * 使用JACKSON序列化  
     * @throws Exception  
     */  
    @Test  
    public void CreateJACKSON() throws Exception{  
 
        CsdnBlog csdn=new CsdnBlog();  
        csdn.setAuthor("fendo");  
        csdn.setContent("這是JAVA書籍");  
        csdn.setTag("C");  
        csdn.setView("100");  
        csdn.setTitile("程式設計");  
        csdn.setDate(new Date().toString());  
 
        // instance a json mapper  
        ObjectMapper mapper = new ObjectMapper(); // create once, reuse  
 
        // generate json  
        byte[] json = mapper.writeValueAsBytes(csdn);  
 
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
    
 
    /**  
     * 使用ElasticSearch 幫助類  
     * @throws IOException   
     */  
    @Test  
    public void CreateXContentBuilder() throws IOException{  
 
        XContentBuilder builder = XContentFactory.jsonBuilder()  
                .startObject()  
                    .field("user", "ccse")  
                    .field("postDate", new Date())  
                    .field("message", "this is Elasticsearch")  
                .endObject();  
 
        IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
        System.out.println("建立成功!");  
 
 
    
 
}
你還可以通過startArray(string)endArray()方法新增陣列。.field()方法可以接受多種物件型別。你可以給它傳遞數字、日期、甚至其他XContentBuilder物件。
     2get API
get api可以通過id檢視文件:
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

配置執行緒

operationThreaded 設定為 true 是在不同的執行緒裡執行此次操作

下面的例子是operationThreaded 設定為 false 

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();
 
3、 Delete API

根據ID刪除:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

配置執行緒

operationThreaded 設定為 true 是在不同的執行緒裡執行此次操作

下面的例子是operationThreaded 設定為 false 

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();
4、 Delete By Query API

通過查詢條件刪除

BulkByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
        .filter(QueryBuilders.matchQuery("gender", "male")) //查詢條件
        .source("persons") //index(索引名)
        .get();  //執行
 
long deleted = response.getDeleted(); //刪除文件的數量

如果需要執行的時間比較長,可以使用非同步的方式處理,結果在回撥裡面獲取

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))      //查詢            
    .source("persons")                //index(索引名)                                    
    .execute(new ActionListener<BulkByScrollResponse>() {     //回撥監聽     
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();   //刪除文件的數量                 
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });
5、 Update API

更新索引

有兩種方式更新索引:

   建立 UpdateRequest,通過client傳送;

   使用 prepareUpdate() 方法;

使用UpdateRequest

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

使用 prepareUpdate() 方法

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\""  ,ScriptService.ScriptType.INLINE, null, null))//指令碼可以是本地檔案儲存的,如果使用檔案儲存的指令碼,需要設定 ScriptService.ScriptType.FILE 
        .get();
 
client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()   //合併到現有文件
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();

Update by script

使用指令碼更新文件

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

Update by mergingdocuments

合併文件

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

Upsert

更新插入,如果存在文件就更新,如果不存在就插入

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest); //如果不存在此文件,就增加 `indexRequest`
client.update(updateRequest).get();

如果 index/type/1 存在,類似下面的文件:

{
    "name"  : "Joe Dalton",
    "gender": "male"        
}

如果不存在,會插入新的文件:

{
    "name" : "Joe Smith",
    "gender": "male"
}

6、 multi get Api

一次獲取多個文件

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1") //一個id的方式
    .add("twitter", "tweet", "2", "3", "4") //多個id的方式
    .add("another", "type", "foo")  //可以從另外一個索引獲取
    .get();
 
for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {      //判斷是否存在                
        String json = response.getSourceAsString(); //_source 欄位
    }
}

7、 Bulk API

8、   Bulk API可以批量插入:

import static org.elasticsearch.common.xcontent.XContentFactory.*;
 
BulkRequestBuilder bulkRequest = client.prepareBulk();
 
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );
 
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );
 
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

9、 使用Bulk Processor

BulkProcessor 提供了一個簡單的介面,在給定的大小數量上定時批量自動請求

建立BulkProcessor例項

首先建立BulkProcessor例項

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
 
BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  //增加elasticsearch客戶端
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } //呼叫bulk之前執行,例如你可以通過request.numberOfActions()方法知道numberOfActions
 
            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } //呼叫bulk之後執行,例如你可以通過request.hasFailures()方法知道是否執行失敗
 
            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } //呼叫失敗拋 Throwable
        })
        .setBulkActions(10000) //每次10000請求
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一塊
        .setFlushInterval(TimeValue.timeValueSeconds(5)) //無論請求數量多少,每5秒鐘請求一次。
        .setConcurrentRequests(1) //設定併發請求的數量。值為0意味著只允許執行一個請求。值為1意味著允許1併發請求。
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//設定自定義重複請求機制,最開始等待100毫秒,之後成倍更加,重試3次,當一次或多次重複請求失敗後因為計算資源不夠丟擲 EsRejectedExecutionException 異常,可以通過BackoffPolicy.noBackoff()方法關閉重試機制
        .build();

BulkProcessor 預設設定

·