java - 如何使用 Java 代码向 Flink 集群提交作业?
问题描述
我已经将一个包含我的应用程序代码的 fat jar 上传到我的 Flink 集群中所有节点的 /lib 文件夹中。我正在尝试从单独的 java 应用程序启动 Flink 作业,但找不到这样做的好方法。
我目前找到的最接近解决方案的是具有运行作业 API 的 Monitoring Rest API。但是,这只允许您运行通过作业上传功能提交的作业。
我在 flink-client 模块中看到了ClusterClient.java,但看不到任何关于我如何使用它的示例。
任何有关某人如何通过 java 代码成功提交作业的示例将不胜感激!
解决方案
您可以使用它RestClusterClient
来运行PackagedProgram
指向您的 Flink 作业的。如果你的工作接受一些参数,你可以传递它们。
以下是运行在 上的独立集群的示例localhost:8081
:
// import org.apache.flink.api.common.JobSubmissionResult;
// import org.apache.flink.client.deployment.StandaloneClusterId;
// import org.apache.flink.client.program.PackagedProgram;
// import org.apache.flink.client.program.rest.RestClusterClient;
// import org.apache.flink.configuration.Configuration;
// import org.apache.flink.configuration.JobManagerOptions;
// import org.apache.flink.configuration.RestOptions;
String clusterHost = "localhost";
int clusterPort = 8081;
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, clusterHost);
config.setInteger(RestOptions.PORT, clusterPort);
String jarFilePath = "/opt/flink/examples/streaming/SocketWindowWordCount.jar";
String[] args = new String[]{ "--port", "9000" };
PackagedProgram packagedProgram = new PackagedProgram(new File(jarFilePath), args);
RestClusterClient<StandaloneClusterId> client =
new RestClusterClient<StandaloneClusterId>(config, StandaloneClusterId.getInstance());
int parallelism = 1;
JobSubmissionResult result = client.run(packagedProgram, parallelism);
推荐阅读
- java - 服务层和spring事务中的验证
- java - Spring JpaRepository ENUM 不匹配
- akka-stream - Akka Streams:具有自定义逻辑的扇出运算符
- request - 创建peyment(条纹)时出错
- javascript - 如果传递的项目存在于 arrayOfArrays 内的所有数组中,则返回 true
- cordova - Meteor为cordova插件设置android本地文件路径
- javascript - 与 Create-React-App 一起使用时,我们应该导入整个库还是特定的单个组件?
- node.js - 为什么等待新的 Promise 导致立即退出?
- c# - 长时间运行的任务在事件触发后推迟执行顺序先前的代码
- c++ - 禁止“重命名”复制文件