【Flink原始碼】四、YarnJobClusterEntrypoint
阿新 • • 發佈:2021-06-18
一、YarnJobClusterEntrypoint
進入main方法
SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); Map<String, String> env = System.getenv(); final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key()); Preconditions.checkArgument( workingDirectory != null, "Working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key()); try { YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG); } catch (IOException e) { LOG.warn("Could not log YARN environment information.", e); } final Configuration dynamicParameters = ClusterEntrypointUtils.parseParametersOrExit( args, new DynamicParametersConfigurationParserFactory(), YarnJobClusterEntrypoint.class); final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env); YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration); //執行程式的入口 ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
clusterEntrypoint.startCluster();
securityContext.runSecured((Callable<Void>) () -> {
runCluster(configuration, pluginManager);
return null;
});
synchronized (lock) {
//初始化服務rpc相關
initializeServices(configuration, pluginManager);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
//建立ResourceManage,建立、啟動Dispatcher,啟動ResourceManage
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
建立ResourceMange、Dispatcher,並啟動
clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this);
具體實現:網頁的開啟
webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start();
resourceManage的啟動
resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname, ioExecutor);
建立Dispatcher
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobGraphStoreFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);
選舉服務:每個元件都有選舉服務,最終要呼叫這個
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
具體實現:
最終執行這個東西:lamda表示式
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
start的實現
runIfStateIs( State.CREATED, this::startInternal);
final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create( DispatcherId.fromUuid(getLeaderSessionId()), Collections.singleton(jobGraph), ThrowingJobGraphWriter.INSTANCE);
最終啟動dispatcher,並啟動
final Dispatcher dispatcher; try { dispatcher = dispatcherFactory.createDispatcher( rpcService, fencingToken, recoveredJobs, (dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); } dispatcher.start();
最終rpc呼叫,akka元件通訊onStart方法
rpcServer.start();