1. 程式人生 > 實用技巧 >Elasticsearch Java High Level REST Client工具類

Elasticsearch Java High Level REST Client工具類

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ultiwill</groupId>
    <artifactId>ultiwill-es7</artifactId>
    <version>1.0
-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8
</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <properties> <logback.version>1.2.3</logback.version> <slf4j.version>1.7.26</slf4j.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11
</version> <scope>test</scope> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.8.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.8.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.8.0</version> </dependency> <!--日誌--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> <!--json--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.32</version> </dependency> </dependencies> </project>
package com.ultiwill.utils;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;

/**
 * @author chong.zuo
 * @date 2020/8/3 17:18
 */
public class EsClientUtil {
    private static final Logger logger = LoggerFactory.getLogger(EsClientUtil.class);

    /**
     * 每次都取client太耗時,大約需要2秒左右,所以只取一次,放在記憶體中,不關閉,一直用
     */
    private static RestHighLevelClient client;
    private static BulkProcessor bulkProcessor;

    /**
     * 組裝ES的hosts
     *
     * @return
     */
    private static HttpHost[] assembleESAddress() {
        HttpHost httpHost1 = new HttpHost("192.168.100.110", 9201, "http");
        HttpHost httpHost2 = new HttpHost("192.168.100.110", 9202, "http");
        List<HttpHost> list = new ArrayList<HttpHost>();
        list.add(httpHost1);
        list.add(httpHost2);
        HttpHost[] ipHost = new HttpHost[list.size()];
        HttpHost[] httpHosts = list.toArray(ipHost);
        return httpHosts;
    }

    /**
     * 獲取client連線
     */
    public static RestHighLevelClient getClient() {
        if (client == null) {
            synchronized (EsClientUtil.class) {
                try {
                    if (client == null) {
                        /** 使用者認證物件 */
                        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                        /** 設定賬號密碼 */
                        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "123456"));
                        /** 建立rest client物件 */
                        RestClientBuilder builder = RestClient.builder(assembleESAddress())
                                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                                    @Override
                                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                                    }
                                });
                        client = new RestHighLevelClient(builder);
                    }
                } catch (Exception e) {
                    logger.error("EsClient建立失敗...." + client, e);
                }
            }
        }
        return client;
    }


    /**
     * 關閉client連線
     */
    public static void closeClient() {
        if (client != null) {
            synchronized (EsClientUtil.class) {
                try {
                    client.close();
                    logger.info("ES Client 關閉成功...");
                } catch (Exception e) {
                    logger.error("ES Client關閉失敗...", e);
                }
            }
        }
    }


    /**
     * 單條儲存
     *
     * @param index
     * @param id
     * @param m
     */
    private static void saveData(String index, String id, Map<String, Object> m) {
        try {
            RestHighLevelClient client = getClient();
            IndexRequest indexRequest = new IndexRequest(index)
                    .id(id)
                    .source(m);
            client.index(indexRequest, RequestOptions.DEFAULT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 獲取單例 BulkProcessor 批量處理類
     */
    public static BulkProcessor getBulkProcessor() {
        if (bulkProcessor == null) {
            synchronized (EsClientUtil.class) {
                try {
                    if (bulkProcessor == null) {
                        bulkProcessor = bulkProcessor(getClient());
                    }
                } catch (Exception e) {
                    logger.error("BulkProcessor建立失敗...." + bulkProcessor, e);
                }
            }
        }
        return bulkProcessor;
    }

    /*
    public static BulkProcessor bulkProcessor(RestHighLevelClient client) {
        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                (request, bulkListener) -> EsClientUtil.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

        return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                // bulk請求前執行
                int numberOfActions = request.numberOfActions();
                logger.info("ES Executing bulk [{}] with {} request", executionId, numberOfActions);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                // bulk請求後執行
                if (response.hasFailures()) {
                    logger.error("ES Bulk [{}] executed with failures ", +executionId);
                } else {
                    logger.info("ES Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                // 失敗後執行
                logger.error("ES Bulk Failed to execute bulk", failure);
            }

            //  達到重新整理的條數
        }).setBulkActions(20000)
                // 達到 重新整理的大小
                .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
                // 固定重新整理的時間頻率
                .setFlushInterval(TimeValue.timeValueSeconds(300))
                //併發執行緒數
                .setConcurrentRequests(5)
                // 重試補償策略
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
    }*/

    /**
     * 例項化 BulkProcessor
     * @param client
     * @return
     */
    public static BulkProcessor bulkProcessor(RestHighLevelClient client) {
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                //bulk請求前執行
                int numberOfActions = request.numberOfActions();
                logger.info("ES Executing bulk [{}] with {} request ", executionId, numberOfActions);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                //bulk請求後執行
                if (response.hasFailures()) {
                    logger.error("ES Bulk [{}] executed with failures ", +executionId);
                } else {
                    logger.info("ES Bulk [{}] completed in {} milliseconds ", executionId, response.getTook().getMillis());
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                // 失敗後執行
                logger.error("ES Bulk Failed to execute bulk ", failure);
            }
        };

        BulkProcessor bulkProcessor = BulkProcessor.builder(
                (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener)
                //  達到重新整理的條數
                .setBulkActions(20000)
                // 達到 重新整理的大小
                .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
                // 固定重新整理的時間頻率
                .setFlushInterval(TimeValue.timeValueSeconds(300))
                //併發執行緒數
                .setConcurrentRequests(5)
                // 重試補償策略
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();
        return bulkProcessor;
    }


    public static void main(String[] args) {
        Date d = new Date();
        String id = d.getTime() + "";
        id = "1596449389932";
        Map<String, Object> m = new HashMap<String, Object>(16);
        m.put("id", id);
        m.put("area_id", 1);
        m.put("camera_id", 1);
        m.put("log_time", new Date().toString());
        m.put("age", 1);
        EsClientUtil.saveData("global_house_list", id, m);
        EsClientUtil.closeClient();
    }


}