【Elasticsearch 5.6.12 原始碼】——【3】啟動過程分析(下)
版權宣告:本文為博主原創,轉載請註明出處!
簡介
本文主要解決以下問題:
1、ES啟動過程中的Node物件都初始化了那些服務?
構造流程
Step 1、建立一個List暫存初始化失敗時需要釋放的資源,並使用臨時的Logger
物件輸出開始初始化的日誌。
這裡首先建立了一個
List<Closeable>
然後輸出日誌initializing ...
。程式碼比較簡單:
final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error boolean success = false; { // use temp logger just to say we are starting. we can't use it later on because the node name might not be set Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings())); logger.info("initializing ..."); }
Step 2、強制設定settings
中client.type
的配置為node
,設定node.name
並檢查索引data目錄的設定。
這部分首先設定
client.type
為node
,接下來呼叫TribeService
的processSettings
方法來處理了“部落”的配置,然後建立NodeEnvironment
,檢查並設定node.name
屬性,最後按需檢查索引資料的Path
的配置並列印一些JVM
的資訊。程式碼如下:
Settings tmpSettings = Settings.builder().put(environment.settings()) .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build(); tmpSettings = TribeService.processSettings(tmpSettings); // create the node environment as soon as possible, to recover the node id and enable logging try { nodeEnvironment = new NodeEnvironment(tmpSettings, environment); resourcesToClose.add(nodeEnvironment); } catch (IOException ex) { throw new IllegalStateException("Failed to create node environment", ex); } final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings); Logger logger = Loggers.getLogger(Node.class, tmpSettings); final String nodeId = nodeEnvironment.nodeId(); tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId); if (DiscoveryNode.nodeRequiresLocalStorage(tmpSettings)) { checkForIndexDataInDefaultPathData(tmpSettings, nodeEnvironment, logger); } // this must be captured after the node name is possibly added to the settings final String nodeName = NODE_NAME_SETTING.get(tmpSettings); if (hadPredefinedNodeName == false) { logger.info("node name [{}] derived from node ID [{}]; set [{}] to override", nodeName, nodeId, NODE_NAME_SETTING.getKey()); } else { logger.info("node name [{}], node ID [{}]", nodeName, nodeId); }
Step 3、建立PluginsService
及Environment
例項。
在
PluginsService
的構造方法中會載入plugins
和modules
目錄下的jar包,並建立相應的plugin
和module
例項。建立完以後,Node
的構造方法中會呼叫pluginsService
的updatedSettings
方法來獲取plugin
和module
中定義的配置項。接下來Node
或使用新的settings
和nodeId
來建立LocalNodeFactory
,並使用最新的settings
重新建立Environment
物件。程式碼如下:
this.pluginsService = new PluginsService(tmpSettings, environment.modulesFile(), environment.pluginsFile(), classpathPlugins); this.settings = pluginsService.updatedSettings(); localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId()); // create the environment based on the finalized (processed) view of the settings // this is just to makes sure that people get the same settings, no matter where they ask them from this.environment = new Environment(this.settings); Environment.assertEquivalent(environment, this.environment);
Step 4、建立ThreadPool
及ThreadContext
例項。
首先,通過
pluginsService
獲取plugin
及module
中提供的ExecutorBuilder
物件列表。接下來基於settings
及獲取的ExecutorBuilder
物件列表建立ThreadPool
及ThreadContext
例項。程式碼如下:
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
// adds the context to the DeprecationLogger so that it does not need to be injected everywhere
DeprecationLogger.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
Step 5、依次建立NodeClient
、ResourceWatcherService
、ScriptModule
、AnalysisModule
、SettingsModule
、NetworkService
、ClusterService
、IngestService
及ClusterInfoService
等主要模組。
ScriptModule
中持有ScriptService
通過該服務可以獲取到ES中配置的各類指令碼引擎的例項。AnalysisModule
中持有AnalysisRegistry
物件,通過該物件可以獲取到ES中配置的各類查詢分析器的例項。SettingModule
中按型別儲存了ES中可以解析的配置物件。NetworkService
主要用來解析網路地址,ClusterService
用例維護叢集的資訊。程式碼如下:
final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
client = new NodeClient(settings, threadPool);
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
final ScriptModule scriptModule = ScriptModule.create(settings, this.environment, resourceWatcherService,
pluginsService.filterPlugins(ScriptPlugin.class));
AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
additionalSettings.addAll(scriptModule.getSettings());
// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
// so we might be late here already
final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
resourcesToClose.add(resourceWatcherService);
final NetworkService networkService = new NetworkService(settings,
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
localNodeFactory::getNode);
clusterService.addStateApplier(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
Step 6、建立ModulesBuilder
並加入各種Module
。
ES使用google開源的
Guice
管理程式中的依賴。加入ModulesBuilder
中的Module
有:通過PluginsService
獲取的外掛提供的Module
;NodeModule
內部持有MonitorService
;ClusterModule
內部持有ClusterService
及相關的ClusterPlugin
;IndicesModule
內部持有MapperPlugin
;SearchModule
內部持有相關的SearchPlugin
;ActionModule
內部持有ThreadPool
、ActionPlugin
、NodeClient
及CircuitBreakerService
;GatewayModule
;RepositoriesModule
內部持有RepositoryPlugin
;SttingsModule
內部ES可用的各類配置物件等;最好呼叫modules
的createInjector
方法建立應用的“依賴注入器”。
Step 7、收集各plugin
的LifecycleComponent
物件,並出初始化NodeClient
。
程式碼如下:
List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
.filter(p -> p instanceof LifecycleComponent)
.map(p -> (LifecycleComponent) p).collect(Collectors.toList());
pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
.map(injector::getInstance).collect(Collectors.toList()));
resourcesToClose.addAll(pluginLifecycleComponents);
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
() -> clusterService.localNode().getId());
if (NetworkModule.HTTP_ENABLED.get(settings)) {
logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodes());
}
logger.info("initialized");
Step 8、呼叫Node
的Start
方法,在該方法內依次呼叫各重要模組的start
方法。
依次啟動各個關鍵服務。程式碼如下:
// hack around dependency injection problem (for now...)
injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
pluginLifecycleComponents.forEach(LifecycleComponent::start);
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(MonitorService.class).start();
final ClusterService clusterService = injector.getInstance(ClusterService.class);
final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();
clusterService.setNodeConnectionsService(nodeConnectionsService);
// TODO hack around circular dependencies problems
injector.getInstance(GatewayAllocator.class).setReallocation(clusterService, injector.getInstance(RoutingService.class));
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(GatewayService.class).start();
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.setDiscoverySettings(discovery.getDiscoverySettings());
clusterService.addInitialStateBlock(discovery.getDiscoverySettings().getNoMasterBlock());
clusterService.setClusterStatePublisher(discovery::publish);
// start before the cluster service since it adds/removes initial Cluster state blocks
final TribeService tribeService = injector.getInstance(TribeService.class);
tribeService.start();
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
transportService.start();
validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress(), pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
clusterService.addStateApplier(transportService.getTaskManager());
clusterService.start();
assert localNodeFactory.getNode() != null;
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
assert clusterService.localNode().equals(localNodeFactory.getNode())
: "clusterService has a different local node than the factory provided";
// start after cluster service so the local disco is known
discovery.start();
transportService.acceptIncomingRequests();
discovery.startInitialJoin();
// tribe nodes don't have a master so we shouldn't register an observer s
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
if (initialStateTimeout.millis() > 0) {
final ThreadPool thread = injector.getInstance(ThreadPool.class);
ClusterState clusterState = clusterService.state();
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
if (clusterState.nodes().getMasterNodeId() == null) {
logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
final CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) { latch.countDown(); }
@Override
public void onClusterServiceClose() {
latch.countDown();
}
@Override
public void onTimeout(TimeValue timeout) {
logger.warn("timed out while waiting for initial discovery state - timeout: {}",
initialStateTimeout);
latch.countDown();
}
}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
try {
latch.await();
} catch (InterruptedException e) {
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}
}
}
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).start();
}
if (WRITE_PORTS_FILE_SETTING.get(settings)) {
if (NetworkModule.HTTP_ENABLED.get(settings)) {
HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
writePortsFile("http", http.boundAddress());
}
TransportService transport = injector.getInstance(TransportService.class);
writePortsFile("transport", transport.boundAddress());
}
// start nodes now, after the http server, because it may take some time
tribeService.startNodes();
logger.info("started");