elasticsearch原始碼分析---threadpool模組
阿新 • • 發佈:2019-01-07
elasticsearch的執行緒池實現在org.elasticsearch.threadpool下。
初始化過程中會載入以threadpool開頭的配置項的配置資訊,然後確定各個執行緒池的大小,預設情況下,會參照處理器個數進行設定:
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5); int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);
其中像INDEX/BULK/GET/SUGGEST/PERCOLATOR等這些執行緒池的大小都設定成了availableProcessor
SEARCH執行緒池大小設定為availableProcessor*3
而FLUSH/SNAPSHOT/OPTIMIZE等這些執行緒池大小設定成halfProcMaxAt5
REFRESH執行緒池則設定成halpProcMaxAt10.
elasticsearch對threadpool的設定思路是相當合理的,一個核對應一個執行緒,減少context switch。而且,物件FLUSH等這些執行緒池的大小也做了限制。
根據這些設定生成executor了。
執行緒池分為cache fix scaling三類,具體設定不再解釋,可以瞭解下java的執行緒池相關知識。
除了es固定的執行緒池之外,還可以自定義執行緒池。
當然,以上預設值都可以在設定中進行更改,build方法的介面就說明了這一點:
private ExecutorHolder build(String name, @Nullable Settings settings, Settings defaultSettings) {
return rebuild(name, null, settings, defaultSettings);
}
其中defaultSetting是上邊執行緒池的預設設定,而settinng則是從配置檔案中載入的資訊,最終會以setting的設定為準。
build會呼叫rebuild函式去建立ExecutorHolder,其中SAME的thread是不能更改的,然後依次對cache fix scaling進行對應的設定。
從rebuild的函式介面就可以看出,rebuild不僅僅用於建立(第二個引數設定為null),還用於更新執行緒池的資訊。
stat會得到當前各個執行緒池的統計資訊。
除了以上固有執行緒,還註冊了一個定期執行器:
this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
可以通過以下介面在其他模組註冊執行的物件:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
return scheduler.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS);
}
比如在SearchService模組中,有如下注冊程式碼:
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval);
總體來看,elasticsearch提供的執行緒池模組是對java threadpool的進一步封裝,如果對java的threadpool比較熟悉的話,這個模組還是比較容易理解。