一、env.execute()
核心逻辑
executeAsync
checkNotNull(streamGraph, "StreamGraph cannot be null."); checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file."); final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration); checkNotNull( executorFactory, "Cannot find compatible factory for specified execution.target (=%s)", configuration.get(DeploymentOptions.TARGET));
//这行是关键 CompletableFuture<JobClient> jobClientFuture = executorFactory
//获取的执行器 .getExecutor(configuration)
//执行 .execute(streamGraph, configuration, userClassloader); try { JobClient jobClient = jobClientFuture.get(); jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null)); return jobClient; }
.getExecutor(configuration)
有这么多的执行器工厂
.execute(streamGraph, configuration, userClassloader);
//把streamGraph转换成JobGraph
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); try (
//yarn初始化、启动yarnCli
final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration))
{
//权限 final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); //拿到jobmanage、taskmanage、slot final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); //部署之前先各种检查,启动appMaster final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor .deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); // return CompletableFuture.completedFuture( new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID(), userCodeClassloader)); }
clusterClientFactory.createClusterDescriptor(configuration)
具体实现:初始化、启动
final YarnClient yarnClient = YarnClient.createYarnClient(); final YarnConfiguration yarnConfiguration = new YarnConfiguration(); yarnClient.init(yarnConfiguration); yarnClient.start(); return new YarnClusterDescriptor( configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), false);
clusterClientFactory.getClusterSpecification(configuration);
具体实现:
checkNotNull(configuration); final int jobManagerMemoryMB = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( configuration, JobManagerOptions.TOTAL_PROCESS_MEMORY) .getTotalProcessMemorySize() .getMebiBytes(); final int taskManagerMemoryMB = TaskExecutorProcessUtils .processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption( configuration, TaskManagerOptions.TOTAL_PROCESS_MEMORY)) .getTotalProcessMemorySize() .getMebiBytes(); int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); return new ClusterSpecification.ClusterSpecificationBuilder() .setMasterMemoryMB(jobManagerMemoryMB) .setTaskManagerMemoryMB(taskManagerMemoryMB) .setSlotsPerTaskManager(slotsPerTaskManager) .createClusterSpecification();
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
具体实现:部署
return deployInternal( clusterSpecification, "Flink per-job cluster", getYarnJobClusterEntrypoint(), jobGraph, detached);
再具体实现:参数: yarnClusterEntrypoint 程序入口
ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached
isReadyForDeployment(clusterSpecification);
// ------------------ Check if the specified queue exists --------------------
checkYarnQueues(yarnClient);
// ------------------ Check if the YARN ClusterClient has the requested resources --------------
// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
Resource maxRes = appResponse.getMaximumResourceCapability();
//启动APPMaster,初始化文件系统,
ApplicationReport report = startAppMaster( flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, yarnClient, yarnApplication, validClusterSpecification); setClusterEntrypointInfoToConfig(report); return () -> { try { return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()); } catch (Exception e) { throw new RuntimeException("Error while creating RestClusterClient.", e); } };