1. 程式人生 > 其它 >【Flink原始碼】四、YarnJobClusterEntrypoint

【Flink原始碼】四、YarnJobClusterEntrypoint

一、YarnJobClusterEntrypoint

進入main方法

SignalHandler.register(LOG);
		JvmShutdownSafeguard.installAsShutdownHook(LOG);

		Map<String, String> env = System.getenv();

		final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
		Preconditions.checkArgument(
			workingDirectory != null,
			"Working directory variable (%s) not set",
			ApplicationConstants.Environment.PWD.key());

		try {
			YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
		} catch (IOException e) {
			LOG.warn("Could not log YARN environment information.", e);
		}

		final Configuration dynamicParameters = ClusterEntrypointUtils.parseParametersOrExit(
			args,
			new DynamicParametersConfigurationParserFactory(),
			YarnJobClusterEntrypoint.class);
		final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);

		YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration);
          //執行程式的入口
		ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);

  

ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);

clusterEntrypoint.startCluster();

securityContext.runSecured((Callable<Void>) () -> {
runCluster(configuration, pluginManager);

return null;
});


synchronized (lock) {
  //初始化服務rpc相關
initializeServices(configuration, pluginManager);

// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

//建立ResourceManage,建立、啟動Dispatcher,啟動ResourceManage
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);

clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}

  建立ResourceMange、Dispatcher,並啟動

clusterComponent = dispatcherResourceManagerComponentFactory.create(
				configuration,
				ioExecutor,
				commonRpcService,
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				archivedExecutionGraphStore,
				new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
				this); 

具體實現:網頁的開啟

webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
				configuration,
				dispatcherGatewayRetriever,
				resourceManagerGatewayRetriever,
				blobServer,
				executor,
				metricFetcher,
				highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
				fatalErrorHandler);

			log.debug("Starting Dispatcher REST endpoint.");
			webMonitorEndpoint.start();

 resourceManage的啟動

resourceManager = resourceManagerFactory.createResourceManager(
				configuration,
				ResourceID.generate(),
				rpcService,
				highAvailabilityServices,
				heartbeatServices,
				fatalErrorHandler,
				new ClusterInformation(hostname, blobServer.getPort()),
				webMonitorEndpoint.getRestBaseUrl(),
				metricRegistry,
				hostname,
				ioExecutor);

  建立Dispatcher

dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
				highAvailabilityServices.getDispatcherLeaderElectionService(),
				fatalErrorHandler,
				new HaServicesJobGraphStoreFactory(highAvailabilityServices),
				ioExecutor,
				rpcService,
				partialDispatcherServices);

  選舉服務:每個元件都有選舉服務,最終要呼叫這個

contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);

  具體實現:

最終執行這個東西:lamda表示式

previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));

  start的實現

runIfStateIs(
			State.CREATED,
			this::startInternal);

  

final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(
			DispatcherId.fromUuid(getLeaderSessionId()),
			Collections.singleton(jobGraph),
			ThrowingJobGraphWriter.INSTANCE);

  最終啟動dispatcher,並啟動

final Dispatcher dispatcher;
		try {
			dispatcher = dispatcherFactory.createDispatcher(
				rpcService,
				fencingToken,
				recoveredJobs,
				(dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(),
				PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));
		} catch (Exception e) {
			throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
		}

		dispatcher.start();

  最終rpc呼叫,akka元件通訊onStart方法

rpcServer.start();