1. 程式人生 > >【Elasticsearch 5.6.12 原始碼】——【3】啟動過程分析(下)

【Elasticsearch 5.6.12 原始碼】——【3】啟動過程分析(下)

版權宣告:本文為博主原創,轉載請註明出處!


簡介

本文主要解決以下問題:

1、ES啟動過程中的Node物件都初始化了那些服務?

構造流程

Step 1、建立一個List暫存初始化失敗時需要釋放的資源,並使用臨時的Logger物件輸出開始初始化的日誌。

這裡首先建立了一個List<Closeable>然後輸出日誌initializing ...。程式碼比較簡單:

final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
boolean success = false;
 {
      // use temp logger just to say we are starting. we can't use it later on because the node name might not be set
      Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
      logger.info("initializing ...");

 }
Step 2、強制設定settingsclient.type的配置為node,設定node.name並檢查索引data目錄的設定。

這部分首先設定client.typenode,接下來呼叫TribeServiceprocessSettings方法來處理了“部落”的配置,然後建立NodeEnvironment,檢查並設定node.name屬性,最後按需檢查索引資料的Path的配置並列印一些JVM的資訊。程式碼如下:

 Settings tmpSettings = Settings.builder().put(environment.settings())
                .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

 tmpSettings = TribeService.processSettings(tmpSettings);

 // create the node environment as soon as possible, to recover the node id and enable logging
 try {
     nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
     resourcesToClose.add(nodeEnvironment);
 } catch (IOException ex) {
     throw new IllegalStateException("Failed to create node environment", ex);
 }
 final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
 Logger logger = Loggers.getLogger(Node.class, tmpSettings);
 final String nodeId = nodeEnvironment.nodeId();
 tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
 if (DiscoveryNode.nodeRequiresLocalStorage(tmpSettings)) {
    checkForIndexDataInDefaultPathData(tmpSettings, nodeEnvironment, logger);
 }
 // this must be captured after the node name is possibly added to the settings
 final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
 if (hadPredefinedNodeName == false) {
     logger.info("node name [{}] derived from node ID [{}]; set [{}] to override", nodeName, nodeId, NODE_NAME_SETTING.getKey());
 } else {
     logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
 }
Step 3、建立PluginsServiceEnvironment例項。

PluginsService的構造方法中會載入pluginsmodules目錄下的jar包,並建立相應的pluginmodule例項。建立完以後,Node的構造方法中會呼叫pluginsServiceupdatedSettings方法來獲取pluginmodule中定義的配置項。接下來Node或使用新的settingsnodeId來建立LocalNodeFactory,並使用最新的settings重新建立Environment物件。程式碼如下:

 this.pluginsService = new PluginsService(tmpSettings, environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
 this.settings = pluginsService.updatedSettings();
 localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());

 // create the environment based on the finalized (processed) view of the settings
 // this is just to makes sure that people get the same settings, no matter where they ask them from
 this.environment = new Environment(this.settings);
 Environment.assertEquivalent(environment, this.environment);
Step 4、建立ThreadPoolThreadContext例項。

首先,通過pluginsService獲取pluginmodule中提供的ExecutorBuilder物件列表。接下來基於settings及獲取的ExecutorBuilder物件列表建立ThreadPoolThreadContext例項。程式碼如下:

 final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
 resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
 // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
 DeprecationLogger.setThreadContext(threadPool.getThreadContext());
 resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
Step 5、依次建立NodeClientResourceWatcherServiceScriptModuleAnalysisModuleSettingsModuleNetworkServiceClusterServiceIngestServiceClusterInfoService等主要模組。

ScriptModule中持有ScriptService通過該服務可以獲取到ES中配置的各類指令碼引擎的例項。AnalysisModule中持有AnalysisRegistry物件,通過該物件可以獲取到ES中配置的各類查詢分析器的例項。SettingModule中按型別儲存了ES中可以解析的配置物件。NetworkService主要用來解析網路地址,ClusterService用例維護叢集的資訊。程式碼如下:

 final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
 final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
 for (final ExecutorBuilder<?> builder : threadPool.builders()) {
     additionalSettings.addAll(builder.getRegisteredSettings());
 }
 client = new NodeClient(settings, threadPool);
 final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
 final ScriptModule scriptModule = ScriptModule.create(settings, this.environment, resourceWatcherService,
     pluginsService.filterPlugins(ScriptPlugin.class));
 AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
 additionalSettings.addAll(scriptModule.getSettings());
 // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
 // so we might be late here already
 final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
 scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
 resourcesToClose.add(resourceWatcherService);
 final NetworkService networkService = new NetworkService(settings,
     getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
 final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
     localNodeFactory::getNode);
 clusterService.addStateApplier(scriptModule.getScriptService());
 resourcesToClose.add(clusterService);
 final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment,
     scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
 final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
Step 6、建立ModulesBuilder並加入各種Module

ES使用google開源的Guice管理程式中的依賴。加入ModulesBuilder中的Module有:通過PluginsService獲取的外掛提供的ModuleNodeModule內部持有MonitorServiceClusterModule內部持有ClusterService及相關的ClusterPluginIndicesModule內部持有MapperPluginSearchModule內部持有相關的SearchPluginActionModule內部持有ThreadPoolActionPluginNodeClientCircuitBreakerServiceGatewayModuleRepositoriesModule內部持有RepositoryPluginSttingsModule內部ES可用的各類配置物件等;最好呼叫modulescreateInjector方法建立應用的“依賴注入器”。

Step 7、收集各pluginLifecycleComponent物件,並出初始化NodeClient

程式碼如下:

 List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
     .filter(p -> p instanceof LifecycleComponent)
     .map(p -> (LifecycleComponent) p).collect(Collectors.toList());
 pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
     .map(injector::getInstance).collect(Collectors.toList()));
 resourcesToClose.addAll(pluginLifecycleComponents);
 this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
 client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
         () -> clusterService.localNode().getId());

 if (NetworkModule.HTTP_ENABLED.get(settings)) {
     logger.debug("initializing HTTP handlers ...");
     actionModule.initRestHandlers(() -> clusterService.state().nodes());
 }
 logger.info("initialized");
Step 8、呼叫NodeStart方法,在該方法內依次呼叫各重要模組的start方法。

依次啟動各個關鍵服務。程式碼如下:

 // hack around dependency injection problem (for now...)
 injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
 pluginLifecycleComponents.forEach(LifecycleComponent::start);

 injector.getInstance(MappingUpdatedAction.class).setClient(client);
 injector.getInstance(IndicesService.class).start();
 injector.getInstance(IndicesClusterStateService.class).start();
 injector.getInstance(IndicesTTLService.class).start();
 injector.getInstance(SnapshotsService.class).start();
 injector.getInstance(SnapshotShardsService.class).start();
 injector.getInstance(RoutingService.class).start();
 injector.getInstance(SearchService.class).start();
 injector.getInstance(MonitorService.class).start();

 final ClusterService clusterService = injector.getInstance(ClusterService.class);

 final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
 nodeConnectionsService.start();
 clusterService.setNodeConnectionsService(nodeConnectionsService);

 // TODO hack around circular dependencies problems
 injector.getInstance(GatewayAllocator.class).setReallocation(clusterService, injector.getInstance(RoutingService.class));

 injector.getInstance(ResourceWatcherService.class).start();
 injector.getInstance(GatewayService.class).start();
 Discovery discovery = injector.getInstance(Discovery.class);
 clusterService.setDiscoverySettings(discovery.getDiscoverySettings());
 clusterService.addInitialStateBlock(discovery.getDiscoverySettings().getNoMasterBlock());
 clusterService.setClusterStatePublisher(discovery::publish);

 // start before the cluster service since it adds/removes initial Cluster state blocks
 final TribeService tribeService = injector.getInstance(TribeService.class);
 tribeService.start();

 // Start the transport service now so the publish address will be added to the local disco node in ClusterService
 TransportService transportService = injector.getInstance(TransportService.class);
 transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
 transportService.start();
 validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress(), pluginsService.filterPlugins(Plugin.class).stream()
 .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));

 clusterService.addStateApplier(transportService.getTaskManager());
 clusterService.start();
 assert localNodeFactory.getNode() != null;
 assert transportService.getLocalNode().equals(localNodeFactory.getNode())
 : "transportService has a different local node than the factory provided";
 assert clusterService.localNode().equals(localNodeFactory.getNode())
 : "clusterService has a different local node than the factory provided";
 // start after cluster service so the local disco is known
 discovery.start();
 transportService.acceptIncomingRequests();
 discovery.startInitialJoin();
 // tribe nodes don't have a master so we shouldn't register an observer  s
 final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
 if (initialStateTimeout.millis() > 0) {
 final ThreadPool thread = injector.getInstance(ThreadPool.class);
 ClusterState clusterState = clusterService.state();
 ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
 if (clusterState.nodes().getMasterNodeId() == null) {
     logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
     final CountDownLatch latch = new CountDownLatch(1);
     observer.waitForNextChange(new ClusterStateObserver.Listener() {
  @Override
  public void onNewClusterState(ClusterState state) { latch.countDown(); }

  @Override
  public void onClusterServiceClose() {
  latch.countDown();
  }

  @Override
  public void onTimeout(TimeValue timeout) {
  logger.warn("timed out while waiting for initial discovery state - timeout: {}",
      initialStateTimeout);
  latch.countDown();
  }
     }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);

     try {
  latch.await();
     } catch (InterruptedException e) {
  throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
     }
 }
 }


 if (NetworkModule.HTTP_ENABLED.get(settings)) {
 injector.getInstance(HttpServerTransport.class).start();
 }

 if (WRITE_PORTS_FILE_SETTING.get(settings)) {
 if (NetworkModule.HTTP_ENABLED.get(settings)) {
     HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
     writePortsFile("http", http.boundAddress());
 }
 TransportService transport = injector.getInstance(TransportService.class);
 writePortsFile("transport", transport.boundAddress());
 }

 // start nodes now, after the http server, because it may take some time
 tribeService.startNodes();
 logger.info("started");