springBoot上elasticsearch連線池設計(Jest)
阿新 • • 發佈:2018-12-13
開篇
- 本篇文章是使用Jest的Api來接入elasticsearch,網上說的Jest不需要連線池,檢視原始碼會發現,JestClient確實是有一個非同步方法(executeAsync()),但是該方法會額外建立一個執行緒去執行搜尋任務,有些類似OkHttp,在高併發高負載的系統上,就會額外的增加系統開銷,上述僅個人想法。在這裡,我講述一種通過定義連線池的方式去優化我們的elasticsearch搜尋任務。
思路
- aop監控es搜尋請求
- before 通過holder取得connection
- after 通過holder將該connection回收
- ThreadLocal 來執行緒繫結connection
相關類的作用
- aop
- EsClientAop
- EsJestClientAnn
- holder
- EsClientHolder
- pool
- EsClientPool
程式碼邏輯
EsClientPool
該類採用固定長度的方式,在類載入的時候就初始化了設定的連線數,這裡通過BlockingQueue阻塞佇列的生產-消費方式來保證多執行緒環境下的安全。其中Factory和Queue都是單利模式,get和remove都是包級別的訪問許可權。
/**
* Created by yzz on 2018/9/15.
*/
public class EsClientPool {
//阻塞佇列
private static BlockingQueue<JestClient> queue;
//使用者自定義配置的Bean
private static EsConfigVO esConfigVO;
//Jest工廠,這裡是單利工廠
private static JestClientFactory factory;
//保證factory queue 的單利
static {
esConfigVO = (EsConfigVO) ApplicationHelper.getBeanByClass(EsConfigVO.class);
queue = new ArrayBlockingQueue<>(esConfigVO.getMaxConnection());
factory = new JestClientFactory();
init();
}
//初始化工廠
private static void init(){
try {
//初始化工廠
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create();
HttpClientConfig clientConfig = new HttpClientConfig
.Builder(esConfigVO.getUrl())
.gson(gson)
.build();
factory.setHttpClientConfig(clientConfig);
//初始化阻塞佇列
//初始化連線
initQueue();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 初始化連線
* @throws InterruptedException
*/
static void initQueue() throws InterruptedException {
int i = 0;
int maxConnection = esConfigVO.getMaxConnection();
while(i<maxConnection){
JestClient client = factory.getObject();
queue.put(client);
i++;
}
System.err.println("es 初始化完成");
}
static JestClient get() throws InterruptedException {
return queue.poll(1, TimeUnit.SECONDS);
}
static void remove(JestClient jestClient) throws InterruptedException {
queue.put(jestClient);
}
}
EsClientHolder
通過ThreadLocal來保證執行緒區域性變數,實現多執行緒下的資源安全。該holder類和EsClientPool 在同一個包下。
/**
* Created by yzz on 2018/9/15.
*/
public class EsClientHolder {
private static ThreadLocal<JestClient> threadLocal = new ThreadLocal<>();
static void set(){
try {
JestClient jestClient = EsClientPool.get();
threadLocal.set(jestClient);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static JestClient get() throws Exception {
JestClient client = threadLocal.get();
System.out.println("當前執行緒:"+Thread.currentThread().getName()+" 當前ES:"+client);
if (null == client) {
throw new Exception("please try again !");
}
return client;
}
static void remove(){
try {
JestClient client = threadLocal.get();
EsClientPool.remove(client);
System.out.println("當前執行緒:"+Thread.currentThread().getName()+" 回收ES:"+client);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
AOP
通過對特定註解的監控,來做到在es搜尋執行前後對connection做執行緒繫結和回收
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface EsJestClientAnn {
}
@Component
@Aspect
@Order(-100)
public class EsClientAop {
@Pointcut(value = "execution(public * com.yzz.boot.es.service ..*.*(..))")
public void defaultJestClient(){}
@Before(value = "defaultJestClient()&&@annotation(com.yzz.boot.es.util.EsJestClientAnn)")
public void initClient(){
EsClientHolder.set();
}
@After(value = "defaultJestClient()&&@annotation(com.yzz.boot.es.util.EsJestClientAnn)")
public void recycle(){
EsClientHolder.remove();
}
}
EsConfigVO
該註解運用了springBoot的特性通過配置檔案來填充該Bean。
@Component
@ConfigurationProperties(prefix = "es.config.common")
public class EsConfigVO {
private String url;
private int maxConnection = 10;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public int getMaxConnection() {
return maxConnection;
}
public void setMaxConnection(int maxConnection) {
this.maxConnection = maxConnection;
}
}
配置檔案
es:
config:
common:
url: http://192.168.1.12:9200
maxConnection: 100
運用
@Service(value = "es_EsSearchService")
public class EsSearchService {
@EsJestClientAnn
public Object test() throws Exception {
JestClient client = EsClientHolder.get();
NodesStats nodesStats = new NodesStats.Builder().build();
JestResult result = client.execute(nodesStats);
return result.getJsonString();
}
}
總結
在本次實踐中,採用的pool方式是全部載入的方式,通過懶載入的方式會更好,下次會及時記錄下來。該pool的設計優點是,在第一次訪問的時候才會去初始化,優點懶載入的意思,也會避免es服務未啟動導致專案啟動失敗的問題。