首页 > 技术文章 > 【Flink提交流程源码】三、StreamExecutionEnvironment启动appMaster,yarnCli,并初始化yarn-cli

fi0108 2021-06-18 17:05 原文

一、env.execute()

核心逻辑

executeAsync

                checkNotNull(streamGraph, "StreamGraph cannot be null.");
		checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

		final PipelineExecutorFactory executorFactory =
			executorServiceLoader.getExecutorFactory(configuration);

		checkNotNull(
			executorFactory,
			"Cannot find compatible factory for specified execution.target (=%s)",
			configuration.get(DeploymentOptions.TARGET));
//这行是关键 CompletableFuture<JobClient> jobClientFuture = executorFactory
//获取的执行器 .getExecutor(configuration)
               //执行 .execute(streamGraph, configuration, userClassloader); try { JobClient jobClient = jobClientFuture.get(); jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null)); return jobClient; }

  

.getExecutor(configuration)
 有这么多的执行器工厂

 

 

 

.execute(streamGraph, configuration, userClassloader);

 

 

 

                //把streamGraph转换成JobGraph
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); try (
//yarn初始化、启动yarnCli
final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration))


{
//权限 final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); //拿到jobmanage、taskmanage、slot final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); //部署之前先各种检查,启动appMaster final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor .deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); // return CompletableFuture.completedFuture( new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID(), userCodeClassloader)); }
clusterClientFactory.createClusterDescriptor(configuration)
具体实现:初始化、启动
final YarnClient yarnClient = YarnClient.createYarnClient();
		final YarnConfiguration yarnConfiguration = new YarnConfiguration();

		yarnClient.init(yarnConfiguration);
		yarnClient.start();

		return new YarnClusterDescriptor(
				configuration,
				yarnConfiguration,
				yarnClient,
				YarnClientYarnClusterInformationRetriever.create(yarnClient),
				false);

  

 

clusterClientFactory.getClusterSpecification(configuration);

 具体实现:

                checkNotNull(configuration);

		final int jobManagerMemoryMB = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
				configuration,
				JobManagerOptions.TOTAL_PROCESS_MEMORY)
			.getTotalProcessMemorySize()
			.getMebiBytes();

		final int taskManagerMemoryMB = TaskExecutorProcessUtils
			.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
				configuration, TaskManagerOptions.TOTAL_PROCESS_MEMORY))
			.getTotalProcessMemorySize()
			.getMebiBytes();

		int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);

		return new ClusterSpecification.ClusterSpecificationBuilder()
			.setMasterMemoryMB(jobManagerMemoryMB)
			.setTaskManagerMemoryMB(taskManagerMemoryMB)
			.setSlotsPerTaskManager(slotsPerTaskManager)
			.createClusterSpecification();

  

clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
具体实现:部署

return deployInternal(
				clusterSpecification,
				"Flink per-job cluster",
				getYarnJobClusterEntrypoint(),
				jobGraph,
				detached);

 再具体实现:参数: yarnClusterEntrypoint 程序入口

                        ClusterSpecification clusterSpecification,
			String applicationName,
			String yarnClusterEntrypoint,
			@Nullable JobGraph jobGraph,
			boolean detached

  



isReadyForDeployment(clusterSpecification);

// ------------------ Check if the specified queue exists --------------------

checkYarnQueues(yarnClient);

// ------------------ Check if the YARN ClusterClient has the requested resources --------------

// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

Resource maxRes = appResponse.getMaximumResourceCapability();


//启动APPMaster,初始化文件系统,
ApplicationReport report = startAppMaster( flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, yarnClient, yarnApplication, validClusterSpecification); setClusterEntrypointInfoToConfig(report); return () -> { try { return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()); } catch (Exception e) { throw new RuntimeException("Error while creating RestClusterClient.", e); } };

  

 

推荐阅读