1. 程式人生 > >springBoot上elasticsearch連線池設計(Jest)

springBoot上elasticsearch連線池設計(Jest)

開篇

  • 本篇文章是使用Jest的Api來接入elasticsearch,網上說的Jest不需要連線池,檢視原始碼會發現,JestClient確實是有一個非同步方法(executeAsync()),但是該方法會額外建立一個執行緒去執行搜尋任務,有些類似OkHttp,在高併發高負載的系統上,就會額外的增加系統開銷,上述僅個人想法。在這裡,我講述一種通過定義連線池的方式去優化我們的elasticsearch搜尋任務。

思路

  • aop監控es搜尋請求
    • before 通過holder取得connection
    • after 通過holder將該connection回收
  • ThreadLocal 來執行緒繫結connection
相關類的作用
  • aop
    • EsClientAop
    • EsJestClientAnn
  • holder
    • EsClientHolder
  • pool
    • EsClientPool

程式碼邏輯

EsClientPool

該類採用固定長度的方式,在類載入的時候就初始化了設定的連線數,這裡通過BlockingQueue阻塞佇列的生產-消費方式來保證多執行緒環境下的安全。其中Factory和Queue都是單利模式,get和remove都是包級別的訪問許可權。

/**
 * Created by yzz on 2018/9/15.
 */
public class EsClientPool {
    //阻塞佇列
    private static BlockingQueue<JestClient> queue;
    //使用者自定義配置的Bean
    private static EsConfigVO esConfigVO;
    //Jest工廠,這裡是單利工廠
    private static JestClientFactory factory;

    //保證factory queue 的單利
    static {
        esConfigVO = (EsConfigVO) ApplicationHelper.getBeanByClass(EsConfigVO.class);
        queue = new
ArrayBlockingQueue<>(esConfigVO.getMaxConnection()); factory = new JestClientFactory(); init(); } //初始化工廠 private static void init(){ try { //初始化工廠 Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create(); HttpClientConfig clientConfig = new HttpClientConfig .Builder(esConfigVO.getUrl()) .gson(gson) .build(); factory.setHttpClientConfig(clientConfig); //初始化阻塞佇列 //初始化連線 initQueue(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 初始化連線 * @throws InterruptedException */ static void initQueue() throws InterruptedException { int i = 0; int maxConnection = esConfigVO.getMaxConnection(); while(i<maxConnection){ JestClient client = factory.getObject(); queue.put(client); i++; } System.err.println("es 初始化完成"); } static JestClient get() throws InterruptedException { return queue.poll(1, TimeUnit.SECONDS); } static void remove(JestClient jestClient) throws InterruptedException { queue.put(jestClient); } }

EsClientHolder

通過ThreadLocal來保證執行緒區域性變數,實現多執行緒下的資源安全。該holder類和EsClientPool 在同一個包下。

/**
 * Created by yzz on 2018/9/15.
 */
public class EsClientHolder {

    private static ThreadLocal<JestClient> threadLocal = new ThreadLocal<>();

    static void set(){
        try {
            JestClient jestClient = EsClientPool.get();
            threadLocal.set(jestClient);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static JestClient get() throws Exception {
        JestClient client = threadLocal.get();
        System.out.println("當前執行緒:"+Thread.currentThread().getName()+" 當前ES:"+client);
        if (null == client) {
            throw new Exception("please try again !");
        }
        return client;
    }

    static void remove(){
        try {
            JestClient client = threadLocal.get();
            EsClientPool.remove(client);
            System.out.println("當前執行緒:"+Thread.currentThread().getName()+" 回收ES:"+client);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

AOP

通過對特定註解的監控,來做到在es搜尋執行前後對connection做執行緒繫結和回收

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface EsJestClientAnn {

}
@Component
@Aspect
@Order(-100)
public class EsClientAop {

    @Pointcut(value = "execution(public * com.yzz.boot.es.service ..*.*(..))")
    public void defaultJestClient(){}

    @Before(value = "defaultJestClient()&&@annotation(com.yzz.boot.es.util.EsJestClientAnn)")
    public void initClient(){
        EsClientHolder.set();
    }

    @After(value = "defaultJestClient()&&@annotation(com.yzz.boot.es.util.EsJestClientAnn)")
    public void recycle(){
        EsClientHolder.remove();
    }

}

EsConfigVO

該註解運用了springBoot的特性通過配置檔案來填充該Bean。

@Component
@ConfigurationProperties(prefix = "es.config.common")
public class EsConfigVO {

    private String url;

    private int maxConnection = 10;

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public int getMaxConnection() {
        return maxConnection;
    }

    public void setMaxConnection(int maxConnection) {
        this.maxConnection = maxConnection;
    }
}

配置檔案

es:
  config:
    common:
      url: http://192.168.1.12:9200
      maxConnection: 100

運用

@Service(value = "es_EsSearchService")
public class EsSearchService {

    @EsJestClientAnn
    public Object test() throws Exception {
        JestClient client = EsClientHolder.get();
        NodesStats nodesStats = new NodesStats.Builder().build();
        JestResult result = client.execute(nodesStats);
        return result.getJsonString();
    }
}

總結

在本次實踐中,採用的pool方式是全部載入的方式,通過懶載入的方式會更好,下次會及時記錄下來。該pool的設計優點是,在第一次訪問的時候才會去初始化,優點懶載入的意思,也會避免es服務未啟動導致專案啟動失敗的問題。