Elasticsearch Java High Level REST Client工具類
阿新 • • 發佈:2020-08-04
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ultiwill</groupId> <artifactId>ultiwill-es7</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <properties> <logback.version>1.2.3</logback.version> <slf4j.version>1.7.26</slf4j.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.8.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.8.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.8.0</version> </dependency> <!--日誌--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> <!--json--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.32</version> </dependency> </dependencies> </project>
package com.ultiwill.utils; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; /** * @author chong.zuo * @date 2020/8/3 17:18 */ public class EsClientUtil { private static final Logger logger = LoggerFactory.getLogger(EsClientUtil.class); /** * 每次都取client太耗時,大約需要2秒左右,所以只取一次,放在記憶體中,不關閉,一直用 */ private static RestHighLevelClient client; private static BulkProcessor bulkProcessor; /** * 組裝ES的hosts * * @return */ private static HttpHost[] assembleESAddress() { HttpHost httpHost1 = new HttpHost("192.168.100.110", 9201, "http"); HttpHost httpHost2 = new HttpHost("192.168.100.110", 9202, "http"); List<HttpHost> list = new ArrayList<HttpHost>(); list.add(httpHost1); list.add(httpHost2); HttpHost[] ipHost = new HttpHost[list.size()]; HttpHost[] httpHosts = list.toArray(ipHost); return httpHosts; } /** * 獲取client連線 */ public static RestHighLevelClient getClient() { if (client == null) { synchronized (EsClientUtil.class) { try { if (client == null) { /** 使用者認證物件 */ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); /** 設定賬號密碼 */ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "123456")); /** 建立rest client物件 */ RestClientBuilder builder = RestClient.builder(assembleESAddress()) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }); client = new RestHighLevelClient(builder); } } catch (Exception e) { logger.error("EsClient建立失敗...." + client, e); } } } return client; } /** * 關閉client連線 */ public static void closeClient() { if (client != null) { synchronized (EsClientUtil.class) { try { client.close(); logger.info("ES Client 關閉成功..."); } catch (Exception e) { logger.error("ES Client關閉失敗...", e); } } } } /** * 單條儲存 * * @param index * @param id * @param m */ private static void saveData(String index, String id, Map<String, Object> m) { try { RestHighLevelClient client = getClient(); IndexRequest indexRequest = new IndexRequest(index) .id(id) .source(m); client.index(indexRequest, RequestOptions.DEFAULT); } catch (Exception e) { e.printStackTrace(); } } /** * 獲取單例 BulkProcessor 批量處理類 */ public static BulkProcessor getBulkProcessor() { if (bulkProcessor == null) { synchronized (EsClientUtil.class) { try { if (bulkProcessor == null) { bulkProcessor = bulkProcessor(getClient()); } } catch (Exception e) { logger.error("BulkProcessor建立失敗...." + bulkProcessor, e); } } } return bulkProcessor; } /* public static BulkProcessor bulkProcessor(RestHighLevelClient client) { BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> EsClientUtil.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener); return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { // bulk請求前執行 int numberOfActions = request.numberOfActions(); logger.info("ES Executing bulk [{}] with {} request", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // bulk請求後執行 if (response.hasFailures()) { logger.error("ES Bulk [{}] executed with failures ", +executionId); } else { logger.info("ES Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 失敗後執行 logger.error("ES Bulk Failed to execute bulk", failure); } // 達到重新整理的條數 }).setBulkActions(20000) // 達到 重新整理的大小 .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB)) // 固定重新整理的時間頻率 .setFlushInterval(TimeValue.timeValueSeconds(300)) //併發執行緒數 .setConcurrentRequests(5) // 重試補償策略 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); }*/ /** * 例項化 BulkProcessor * @param client * @return */ public static BulkProcessor bulkProcessor(RestHighLevelClient client) { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //bulk請求前執行 int numberOfActions = request.numberOfActions(); logger.info("ES Executing bulk [{}] with {} request ", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { //bulk請求後執行 if (response.hasFailures()) { logger.error("ES Bulk [{}] executed with failures ", +executionId); } else { logger.info("ES Bulk [{}] completed in {} milliseconds ", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 失敗後執行 logger.error("ES Bulk Failed to execute bulk ", failure); } }; BulkProcessor bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener) // 達到重新整理的條數 .setBulkActions(20000) // 達到 重新整理的大小 .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB)) // 固定重新整理的時間頻率 .setFlushInterval(TimeValue.timeValueSeconds(300)) //併發執行緒數 .setConcurrentRequests(5) // 重試補償策略 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build(); return bulkProcessor; } public static void main(String[] args) { Date d = new Date(); String id = d.getTime() + ""; id = "1596449389932"; Map<String, Object> m = new HashMap<String, Object>(16); m.put("id", id); m.put("area_id", 1); m.put("camera_id", 1); m.put("log_time", new Date().toString()); m.put("age", 1); EsClientUtil.saveData("global_house_list", id, m); EsClientUtil.closeClient(); } }