1. 程式人生 > >ElasticSearch中bulkProcesser使用遇到的問題

ElasticSearch中bulkProcesser使用遇到的問題

廢話不多說,直接上程式碼。在批量插入的時候,我設定當請求超過1000個(條件一),或者總大小超過1024MB時(條件二),或者當請求個數和大小都不滿足要求時我設定的是每30s(條件三)執行觸發批量提交動作。
但是當我實際執行的時候,180條資料幾乎就是瞬間完成的。也就是說我設定的三個條件並沒有生效。
有沒有大佬遇到過這種問題?我程式碼的問題在哪裡?

package com.yuguo.es.bulkapi;

/**

  • Elasticsearch的Bulk API允許批量提交index和delete請求。

  • @author : FangRonghao

  • @date : 2018年12月7日

  • @description
    */
    @Slf4j
    @RestController
    public class EsBulkApi {

    @Autowired
    private ElasticsearchConfig esConfig;

    /*es 索引 index/
    public static final String TEST_SEARCH_INDEX = “test.search”;
    /*es 型別 type/
    public static final String TEST_SEARCH_TYPE = “test_search”;

    /**

    • bulkapi
    • 在例子中,當請求超過50個(default=1000)或者總大小超過5MB(default=5MB)時,觸發批量提交動作。
      /
      public void bulkApiAction2(){
      EsClientPool pool = EsUtils.esClientPool(esConfig);
      EsClient esClient = pool.getEsClient();
      BulkProcessor bulkProcessor = null;
      SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
      try {
      bulkProcessor = BulkProcessor.builder(esClient.getClient(),
      new BulkProcessor.Listener() {
      /
      *
      * beforeBulk會在批量提交之前執行,可以從BulkRequest中獲取請求資訊request.requests()或者請求數量request.numberOfActions()。
      /
      @Override
      public void beforeBulk(long executionId, BulkRequest request) {
      // TODO Auto-generated method stub
      log.info("—嘗試插入{}條資料—", request.numberOfActions());
      }
      /
      *
      * 會在批量失敗後執行。
      /
      @Override
      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
      // TODO Auto-generated method stub
      log.error("[es錯誤]—嘗試插入資料失敗—", failure);
      }
      /
      *
      * 會在批量成功後執行,可以跟beforeBulk配合計算批量所需時間。
      */
      @Override
      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
      // TODO Auto-generated method stub
      boolean hasFailures = response.hasFailures();
      if(hasFailures){
      String buildFailureMessage = response.buildFailureMessage();
      log.info("—嘗試插入{}條資料失敗—", request.numberOfActions());
      log.info("—失敗原因—"+buildFailureMessage);
      } else{
      log.info("—嘗試插入{}條資料成功—", request.numberOfActions());
      }
      }
      })
      .setBulkActions(1000)
      .setBulkSize(new ByteSizeValue(1024, ByteSizeUnit.MB))
      .setFlushInterval(TimeValue.timeValueSeconds(30))
      .build();
      for(int i=0; i<180; i++){
      Map<String,Object> searchMap = new HashMap<String,Object>();
      searchMap.put(“userId”, i);
      searchMap.put(“userName”, “yuguo”+i);
      searchMap.put(“createTime”, sdf.format(new Date()));
      bulkProcessor.add(new IndexRequest(TEST_SEARCH_INDEX, TEST_SEARCH_TYPE, i+"").source(searchMap));
      }
      } catch (Exception e) {
      log.error(“bulk api action error”,e);
      } finally {
      if(bulkProcessor != null) bulkProcessor.close();
      pool.removeEsClient(esClient);
      }
      }

}