Elasticsearch原始碼分析 | 單節點的啟動和關閉
本文主要簡要介紹Elasticsearch單節點的啟動和關閉流程。Elasticsearch版本:6.3.2
相關文章
1、Google Guice 快速入門
2、Elasticsearch 中的 Guice
3、教你編譯除錯Elasticsearch 6.3.2原始碼
4、Elasticsearch 6.3.2 啟動過程
建立節點
Elasticsearch的啟動引導類為 Bootstrap 類,在建立節點 Node 物件之前,Bootstrap 會解析配置和進行一些安全檢查等
environment 物件主要是解析出來的配置資訊
建立節點過程的主要工作是建立各個模組物件和服務物件,完成 Guice 依賴繫結
ModulesBuilder 用於統一管理 Module
ModulesBuilder modules = new ModulesBuilder(); ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService); modules.add(clusterModule); // 將模組加入管理 //.... // 例項繫結 modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry); b.bind(PluginsService.class).toInstance(pluginsService); b.bind(Client.class).toInstance(client); b.bind(NodeClient.class).toInstance(client); b.bind(Environment.class).toInstance(this.environment); b.bind(ThreadPool.class).toInstance(threadPool); b.bind(NodeEnvironment.class).toInstance(nodeEnvironment); // .... } ); injector = modules.createInjector(); // 生成注入器
主要的服務類簡介如下:
服務 | 簡介 |
---|---|
ResourceWatcherService | 通用資源監視服務 |
HttpServerTransport | HTTP傳輸服務,提供Rest介面服務 |
SnapshotsService | 快照服務 |
SnapshotShardsService | 負責啟動和停止shard級快照 |
IndicesClusterStateService | 根據收到的叢集狀態資訊,處理相關索引 |
Discovery | 叢集拓撲管理 |
RoutingService | 處理路由(節點之間遷移shard) |
ClusterService | 叢集管理服務,主要處理叢集任務,釋出叢集狀態 |
NodeConnectionsService | 節點連線管理服務 |
MonitorService | 提供程序級、系統級、檔案系統和JVM的監控服務 |
GatewayService | 負責叢集元資料持久化與恢復 |
SearchService | 處理搜尋請求 |
TransportService | 底層傳輸服務 |
plugins | 外掛 |
IndicesService | 負責建立、刪除索引等索引操作 |
啟動節點
啟動節點的主要工作是啟動各個模組的服務物件,服務物件從注入器 injector
中取出來,然後呼叫它們的 start
方法,服務物件的 start
方法的工作基本是初始化內部資料、建立執行緒池、啟動執行緒池等,詳細的流程留到後面的文章中再介紹。
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
在啟動 Discovery 和 ClusterService 之前,還會呼叫 validateNodeBeforeAcceptingRequests 方法來檢測環境外部,外部環境主要是JVM、作業系統相關引數,將一些影響效能的配置標記為錯誤以引起使用者的重視。
環境檢測
節點的環境檢測程式碼都封裝在 BootstrapChecks 類中,BootstrapChecks 類通過責任鏈模式對十幾個檢測項進行檢測,關於責任鏈模式可以翻看這篇文章《設計模式之責任鏈模式及典型應用》
這裡的責任鏈模式中的抽象處理者由 BootstrapCheck 介面扮演,它定義了一個處理方法 check
,而每個檢查項則是具體處理者,都有對應的一個靜態類,具體的檢查則在 check
介面中完成
以第一個檢查項 “堆大小檢查” 為例,從 JvmInfo 類中獲取配置的堆的初始值和最大值進行比較,不相等則格式化提示資訊,最後返回檢查結果
static class HeapSizeCheck implements BootstrapCheck {
@Override
public BootstrapCheckResult check(BootstrapContext context) {
final long initialHeapSize = getInitialHeapSize();
final long maxHeapSize = getMaxHeapSize();
if (initialHeapSize != 0 && maxHeapSize != 0 && initialHeapSize != maxHeapSize) {
final String message = String.format(Locale.ROOT,
"initial heap size [%d] not equal to maximum heap size [%d]; " +
"this can cause resize pauses and prevents mlockall from locking the entire heap",
getInitialHeapSize(), getMaxHeapSize());
return BootstrapCheckResult.failure(message);
} else {
return BootstrapCheckResult.success();
}
}
long getInitialHeapSize() {
return JvmInfo.jvmInfo().getConfiguredInitialHeapSize();
}
long getMaxHeapSize() {
return JvmInfo.jvmInfo().getConfiguredMaxHeapSize();
}
}
把所有檢查項的物件新增到一個 List 鏈中
static List<BootstrapCheck> checks() {
final List<BootstrapCheck> checks = new ArrayList<>();
checks.add(new HeapSizeCheck());
final FileDescriptorCheck fileDescriptorCheck
= Constants.MAC_OS_X ? new OsXFileDescriptorCheck() : new FileDescriptorCheck();
checks.add(fileDescriptorCheck);
checks.add(new MlockallCheck());
if (Constants.LINUX) {
checks.add(new MaxNumberOfThreadsCheck());
}
if (Constants.LINUX || Constants.MAC_OS_X) {
checks.add(new MaxSizeVirtualMemoryCheck());
}
if (Constants.LINUX || Constants.MAC_OS_X) {
checks.add(new MaxFileSizeCheck());
}
if (Constants.LINUX) {
checks.add(new MaxMapCountCheck());
}
checks.add(new ClientJvmCheck());
checks.add(new UseSerialGCCheck());
checks.add(new SystemCallFilterCheck());
checks.add(new OnErrorCheck());
checks.add(new OnOutOfMemoryErrorCheck());
checks.add(new EarlyAccessCheck());
checks.add(new G1GCCheck());
checks.add(new AllPermissionCheck());
return Collections.unmodifiableList(checks);
}
for 迴圈分別呼叫 check 方法進行檢查,有些檢查項檢查不通過是可以忽略的,如果有不能忽略的錯誤則會丟擲異常
for (final BootstrapCheck check : checks) {
final BootstrapCheck.BootstrapCheckResult result = check.check(context);
if (result.isFailure()) {
if (!(enforceLimits || enforceBootstrapChecks) && !check.alwaysEnforce()) {
ignoredErrors.add(result.getMessage());
} else {
errors.add(result.getMessage());
}
}
}
那麼檢查項有哪些呢?
堆大小檢查
:如果開啟了bootstrap.memory_lock
,則JVM在啟動時將鎖定堆的初始大小,若配置的初始值與最大值不等,堆變化後無法保證堆都鎖定在記憶體中檔案描述符檢查
:ES程序需要非常多的檔案描述符,所以須配置系統的檔案描述符的最大數量ulimit -n 65535
記憶體鎖定檢查
:ES允許程序只使用實體記憶體,若使用交換分割槽可能會帶來很多問題,所以最好讓ES鎖定記憶體最大執行緒數檢查
:ES程序會建立很多執行緒,這個數最少需2048最大虛擬記憶體檢查
最大檔案大小檢查
:段檔案和事務日誌檔案可能會非常大,建議這個數設定為無限虛擬記憶體區域最大數量檢查
JVM Client模式檢查
序列收集檢查
:ES預設使用 CMS 垃圾回收器,而不是 Serial 收集器系統呼叫過濾器檢查
OnError與OnOutOfMemoryError檢查
Early-access檢查
:ES最好執行在JVM的穩定版本上G1GC檢查
順便一提,JvmInfo 則是利用了 JavaSDK 自帶的 ManagementFactory 類來獲取JVM資訊的,獲取的 JVM 屬性如下所示
long pid; // 程序ID
String version; // Java版本
String vmName; // JVM名稱
String vmVersion; // JVM版本
String vmVendor; // JVM開發商
long startTime; // 啟動時間
long configuredInitialHeapSize; // 配置的堆的初始值
long configuredMaxHeapSize; // 配置的堆的最大值
Mem mem; // 記憶體資訊
String[] inputArguments; // JVM啟動時輸入的引數
String bootClassPath;
String classPath;
Map<String, String> systemProperties; // 系統環境變數
String[] gcCollectors;
String[] memoryPools;
String onError;
String onOutOfMemoryError;
String useCompressedOops;
String useG1GC; // 是否使用 G1 垃圾回收器
String useSerialGC; // 是否使用 Serial 垃圾回收器
keepAlive 執行緒
在啟動引導類 Bootstrap 的 start 方法中,啟動節點之後還會啟動一個 keepAlive 執行緒
private void start() throws NodeValidationException {
node.start();
keepAliveThread.start();
}
// CountDownLatch 初始值為 1
private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
Bootstrap() {
keepAliveThread = new Thread(new Runnable() {
@Override
public void run() {
try {
keepAliveLatch.await(); // 一直等待直到 CountDownLatch 減為 0
} catch (InterruptedException e) {
// bail out
}
}
}, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
keepAliveThread.setDaemon(false); // false 使用者執行緒
// keep this thread alive (non daemon thread) until we shutdown
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// 當程序收到關閉 SIGTERM 或 SIGINT 訊號時,CountDownLatch 減1
keepAliveLatch.countDown();
}
});
}
if (addShutdownHook) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
}
}
});
}
keepAliveThread 執行緒本身不做具體的工作。主執行緒執行完啟動流程後會退出,keepAliveThread 執行緒是唯一的使用者執行緒,作用是保持程序執行。在Java程式中,一個程序至少需要有一個使用者執行緒,當用戶執行緒為零時將退出程序。
做個試驗,將 keepAliveThread.setDaemon(false);
中的 false
改為 true
,會發現Elasticsearch啟動後馬上就停止了
[2019-01-08T01:28:47,522][INFO ][o.e.n.Node ] [1yGidog] started
[2019-01-08T01:28:47,525][INFO ][o.e.n.Node ] [1yGidog] stopping ...
關閉節點
關閉的順序大致為:
- 關閉快照和HTTPServer,不再響應使用者REST請求
- 關閉叢集拓撲管理,不再響應ping請求
- 關閉網路模組,讓節點離線
- 執行各個外掛的關閉流程
- 關閉IndicesService,這期間需要等待釋放的資源最多,時間最長
public static void close(final Exception ex, final Iterable<? extends Closeable> objects) throws IOException {
Exception firstException = ex;
for (final Closeable object : objects) {
try {
if (object != null) {
object.close();
}
} catch (final IOException | RuntimeException e) {
if (firstException == null) {
firstException = e;
} else {
firstException.addSuppressed(e);
}
}
}
// ...
}
private Node stop() {
if (!lifecycle.moveToStopped()) {
return this;
}
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
logger.info("stopping ...");
injector.getInstance(ResourceWatcherService.class).stop();
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).stop();
}
injector.getInstance(SnapshotsService.class).stop();
injector.getInstance(SnapshotShardsService.class).stop();
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
// close discovery early to not react to pings anymore.
// This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
injector.getInstance(Discovery.class).stop();
// we close indices first, so operations won't be allowed on it
injector.getInstance(RoutingService.class).stop();
injector.getInstance(ClusterService.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
nodeService.getMonitorService().stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
pluginLifecycleComponents.forEach(LifecycleComponent::stop);
// we should stop this last since it waits for resources to get released
// if we had scroll searchers etc or recovery going on we wait for to finish.
injector.getInstance(IndicesService.class).stop();
logger.info("stopped");
return this;
}
節點的關閉當然沒那麼簡單。更多細節敬請期待。
參考:
張超.Elasticsearch原始碼解析與優化實戰
後記
歡迎評論、轉發、分享,您的支援是我最大的動力
更多內容可訪問我的個人部落格:http://laijianfeng.org
關注【小旋鋒】微信公眾號,及時接收博文推送