1. 程式人生 > >Elasticsearch 程式設計API入門系列

Elasticsearch 程式設計API入門系列

說明:我這裡是3臺機器組建的es叢集,然後編寫其程式碼!

  192.168.80.10、192.168.80.11、192.168.80.12

我這裡,elasticsearch用的是2.4.X版本。

API文件:https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.4/java-docs.html

刪除預設的App.java

以TestEs.java為例

專案結構


通過TransportClient這個類,指定es叢集中其中一臺或多臺機的ip地址和埠

     TransportClient client = TransportClient.builder().build().addTransportAddress(new InetSocketTransportAddress          (InetAddress.getByName("host1"), 9300)).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300)); 

  如果需要使用其他名稱的叢集(預設是elasticsearch),需要如下設定

    Settings settings = Settings.settingsBuilder().put("cluster.name", "myClusterName").build();

    TransportClientclient = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress    (InetAddress.getByName("host1"), 9300));

  通過TransportClient這個介面,自動嗅探整個叢集的狀態,es會自動把叢集中其它機器的ip地址加到客戶端中

    Settings settings = Settings.settingsBuilder().put("client.transport.sniff", true).build();
    TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress (InetAddress.getByName("host1"), 9300));


索引index(四種json,map,bean,es helper)

  IndexResponse response = client.prepareIndex("zhouls", "emp", "1").setSource().get()

查詢get

  GetResponse response = client.prepareGet("zhouls", "emp", "1").get();

更新update

刪除delete

  DeleteResponse response = client.prepareDelete("zhouls", "emp", "1").execute().actionGet();

總數count

  long count = client.prepareCount("zhouls").get().getCount();

前提

  準備,開啟3臺機器組建的es叢集程序


程式碼編寫:

package zhouls.bigdata.myElasticsearch;

import static org.junit.Assert.*;

import java.net.InetAddress;
import java.util.HashMap;
import java.util.List;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.MatchQueryBuilder.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.Before;
import org.junit.Test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;

public class TestEs {
    
    //es和hadoop沒關係啊,獲取一個transportclient就可以操作es了

    
    private TransportClient transportClient;
    @Before//@Before和@Test的區別:每次執行都要先經過@Before,好比是,它是一個模板。
    //before表示在執行每個test方法之前執行,常與@Test搭配使用
    public void test0() throws Exception {
        //獲取TransportClient,來操作es
        transportClient = TransportClient.builder().build();
        //需要使用9300埠
        TransportAddress transportAddress = new InetSocketTransportAddress(InetAddress.getByName("192.168.80.10"), 9300);
        //新增節點資訊,最少指定叢集內的某一個節點即可操作這個es叢集
        transportClient.addTransportAddress(transportAddress);
    }
    
    /**
     * 用java程式碼測試的時候這樣寫是沒有問題的,比較簡單
     * @throws Exception
     */
    @Test
    public void test1() throws Exception {
        //獲取TransportClient,來操作es
        TransportClient transportClient = TransportClient.builder().build();
        //需要使用9300埠,指定es叢集中的節點資訊, 這個地方指定的埠是節點和節點之間的通訊埠是9300,不是Http請求的埠9200.
        TransportAddress transportAddress = new InetSocketTransportAddress(InetAddress.getByName("192.168.80.11"), 9300);
        //新增節點資訊,最少指定叢集內的某一個節點即可操作這個es叢集
        transportClient.addTransportAddress(transportAddress);
        System.out.println(transportClient.toString());
    }
    
    /**
     * 可以這樣寫,防止程式碼中指定的連結失效
     * 但是寫起來比較麻煩
     * 在實際工作中這樣寫不是很靠譜,需要完善,做測試可以
     * @throws Exception
     */
    @Test
    public void test2() throws Exception {
        //獲取TransportClient,來操作es,通過TransportClient可以和es叢集互動
        TransportClient transportClient = TransportClient.builder().build();
        //需要使用9300埠,指定es叢集中的節點資訊, 這個地方指定的埠是節點和節點之間的通訊埠是9300,不是Http請求的埠9200.
        TransportAddress transportAddress = new InetSocketTransportAddress(InetAddress.getByName("192.168.80.10"), 9300);
        TransportAddress transportAddress1 = new InetSocketTransportAddress(InetAddress.getByName("192.168.80.11"), 9300);
        TransportAddress transportAddress2 = new InetSocketTransportAddress(InetAddress.getByName("192.168.80.12"), 9300);
        //新增節點資訊,最少指定叢集內的某一個節點即可操作這個es叢集
        transportClient.addTransportAddresses(transportAddress,transportAddress1,transportAddress2);//加入多個地址
        System.out.println(transportClient.toString());
    }
    
    /**
     * 實際生產環境下面,建議這樣用,加上下面這些配置資訊
     * @throws Exception
     */
    @Test
    public void test3() throws Exception {
        //指定es的配置資訊 
        Settings settings = Settings.settingsBuilder()
                .put("cluster.name", "elasticsearch")//叢集名稱
                        //如果叢集名稱在配置檔案中被修改了,那麼在這需要顯式定義一下
                        //es叢集名稱預設是 elasticsearch  sniff嗅; 發現;
                .put("client.transport.sniff", true)//開啟叢集的嗅探功能,只需要指定叢集中一個節點資訊即可獲取到叢集中的所有節點資訊
                        //開啟叢集的嗅探功能,這樣可以保證es會自動把叢集中的其他節點資訊新增到transportClient裡面
                        //開啟嗅探功能後 只要指定叢集中的任意一個可用節點就可以了.當把程式碼執行之後TransportClient裡面會把叢集中所有節點的資訊都拿到,能識別叢集中的所有節點.
                .build();
        
        //獲取TransportClient,來操作es,//通過TransportClient可以和es叢集互動
        TransportClient transportClient = TransportClient.builder().settings(settings).build();
        //需要使用9300埠,指定es叢集中的節點資訊, 這個地方指定的埠是節點和節點之間的通訊埠是9300,不是Http請求的埠9200.
        TransportAddress transportAddress = new InetSocketTransportAddress(InetAddress.getByName("192.168.80.10"), 9300);
        //新增節點資訊,最少指定叢集內的某一個節點即可操作這個es叢集
        transportClient.addTransportAddress(transportAddress);
        
        //獲取client連結到的節點資訊, //獲取當前transportClient連線到了叢集多少個節點
        List<DiscoveryNode> connectedNodes = transportClient.connectedNodes();
        for (DiscoveryNode discoveryNode : connectedNodes) {//for星型迴圈,將connectedNodes的值,一一傳給DiscoveryNode discoveryNode
            System.out.println(discoveryNode.getHostName());//列印192.168.80.10;192.168.80.11;192.168.80.12
            //如果加入transportClient.addTransportAddresses(transportAddress)  只有一個ip,列印的就只有一個.
        }
    }
    
    
    String index = "zhouls";//設定索引庫
    String type = "emp";//設定型別
    
    //索引index(四種格式:json,map,bean,es helper)

    /**
     * index-1 json
     * 實際工作中使用
     * @throws Exception
     */
    @Test
    public void test4() throws Exception {
        String jsonStr = "{\"name\":\"tom zhang\",\"age\":19}";//需要轉義下  //向索引庫中傳入一個String字串,還可以接受其他型別
        IndexResponse indexResponse = transportClient.prepareIndex(index, type, "1")//新增一個id=1的資料
                                                     .setSource(jsonStr)//設值,這是json格式的
                                                     .get();
                        //.execute().actionGet();   這個和上面的get()方法是一樣的,get()就是對.execute().actionGet() 進行了封裝
        System.out.println(indexResponse.getVersion());
                        //得到這個資料的version,如果version=1代表是新新增的資料
    }
    
    
    
    /**
     * index-2 hashmap
     * 實際工作中使用
     * @throws Exception
     */
    @Test
    public void test5() throws Exception {//把hashmap型別的資料放入index庫
        HashMap<String, Object> hashMap = new HashMap<String, Object>();
                //HashMap<String, Object> hashMap是迭代器變數
        hashMap.put("name", "tom");
        hashMap.put("age", 15);
        IndexResponse indexResponse = transportClient.prepareIndex(index, type, "2")//新增一個id=2的資料
                                                     .setSource(hashMap)//設值
                                                     .get();
                    //.execute().actionGet();   這個和上面的get()方法是一樣的,get()就是對.execute().actionGet() 進行了封裝
        System.out.println(indexResponse.getVersion());
    }
    
    
    
    
    /**
     * index-3 bean
     * 實際工作中使用
     * 使用物件的時候需要把物件中的屬性轉化成json字串
     * @throws Exception
     */
    
//    <dependency>
//                <groupId>com.fasterxml.jackson.core</groupId>
//                <artifactId>jackson-databind</artifactId>
//                <version>2.1.3</version>
//    </dependency>
    
    
    @Test
    public void test6() throws Exception {//傳入一個物件到index索引庫,這裡是Person物件
        Person person = new Person();
        person.setName("mack");
        person.setAge(20);
        
         //如果直接傳入一個person物件會報錯,java.lang.IllegalArgumentException,必須把物件轉換成一個Json字串,使用jackson依賴
        //IndexResponse indexResponse = transportClient.prepareIndex(index, type, "9").setSource(person).get();
        
        
        ObjectMapper objectMapper = new ObjectMapper();
        String writeValueAsString = objectMapper.writeValueAsString(person);
        IndexResponse indexResponse = transportClient.prepareIndex(index, type, "3")
                                                     .setSource(writeValueAsString)
                                                     .get();
//        IndexResponse indexResponse = transportClient.prepareIndex(index, type, "3").setSource(objectMapper.writeValueAsString(person)).get();
        
        System.out.println(indexResponse.getVersion());
    }
    
    /**
     * index -4 es helper
     * 測試資料這樣使用
     * @throws Exception
     */
    @Test
    public void test7() throws Exception {
        XContentBuilder builder = XContentFactory.jsonBuilder()//XContentFactory 這個是ES官方提供的可以構建Json字串的工具類.
                                                 .startObject()
                                                 .field("name", "jessic")
                                                 .field("age", 28)
                                                 .endObject();
        
        IndexResponse indexResponse = transportClient.prepareIndex(index, type, "4")
                                                     .setSource(builder)
                                                     .get();
        System.out.println(indexResponse.getVersion());
    }
    
    
    /**
     * get 查詢
     * 通過id查詢 
     * @throws Exception
     */
    @Test
    public void test8() throws Exception {
        GetResponse getResponse = transportClient.prepareGet(index, type, "4")//查詢id為4的資料
                                                 .get();
        System.out.println(getResponse.getSourceAsString());
    }
    
    /**
     * 區域性更新
     * @throws Exception
     */
    @Test
    public void test9() throws Exception {
        XContentBuilder builder = XContentFactory.jsonBuilder()//XContentFactory 這個是ES官方提供的可以構建Json字串的工具類.
                                                 .startObject()
                                                 .field("age", 29)
                                                 .endObject();
        
        UpdateResponse updateResponse = transportClient.prepareUpdate(index, type, "4")//更新id為4的資料
                                                 .setDoc(builder)
                                                 .get();
        System.out.println(updateResponse.getVersion());//version列印2 資料更新
    }
    
    /**
     * 刪除
     * 通過id刪除
     * @throws Exception
     */
    @Test
    public void test10() throws Exception {
            transportClient.prepareDelete(index, type, "4")//刪除id為4的資料
                        .get();
    }
    
    /**
     * count 取總數  類似於sql中的 select count(1) from table;
     * 求總數
     * 類似於mysql中的select count(*)
     */
    @Test
    public void test11() throws Exception {
        long count = transportClient.prepareCount(index)//查詢索引庫中的資料個數
                                    .setTypes(type)
                                    .get()
                                    .getCount();
        System.out.println(count);
    }
    
    
    /**
     * bulk 批量操作 適合初始化資料的時候使用,提高效率
     * 批量操作 bulk
     * @throws Exception
     */
    @Test
    public void test12() throws Exception {
        BulkRequestBuilder prepareBulk = transportClient.prepareBulk();
        
        //for迴圈執行----
        //index請求
        IndexRequest indexRequest = new IndexRequest(index, type, "10");
        indexRequest.source("{\"name\":\"zhangsan\",\"age\":17}");
        //delete請求
        DeleteRequest deleteRequest = new DeleteRequest(index, type, "1");
        
        
        prepareBulk.add(indexRequest );//bulkBuilder中可以新增多個操作,這裡一個是建立索引的操作.
        prepareBulk.add(deleteRequest);//一個是刪除的操作
        
        //執行 bulk
        BulkResponse bulkResponse = prepareBulk.get();
        if(bulkResponse.hasFailures()){//批量操作中可能有的操作會出現問題,這個地方對操作失敗的處理
            //有執行失敗的
            BulkItemResponse[] items = bulkResponse.getItems();
            for (BulkItemResponse bulkItemResponse : items) {
                //獲取失敗資訊,並列印
                System.out.println(bulkItemResponse.getFailureMessage());
            }
        }else{
            System.out.println("全部執行成功!");
        }
    }