kubernetes - ProgramInvocationException: main 方法导致错误: null run flink task in kubernetes cluster
问题描述
我正在向Version: 1.10.0 Commit: aa4eb8f @ 07.02.2020 @ 19:18:19 CET
我的 kubernetes 集群(v1.15.2)中的 flink(版本 1.10.0)Web UI()提交作业,任务管理器运行错误如下:
2020-04-04 16:38:02,276 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: null
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50)
我本地机器上的相同代码运行良好。这是我的任务代码:
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkUtil.initEnv(env);
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("87.102.95.23")
.setPort(5672)
.setUserName("admin123")
.setPassword("123456")
.setVirtualHost("my_vhost")
.build();
RMQSource rmqSource = new RMQSource<>(connectionConfig,
BizGlobalConstant.WALLET_CONSUME_REPORT_MESSAGE_QUEUE_NAME,
true,
new SimpleStringSchema());
DataStream<String> dataStreamSource = env.addSource(rmqSource);
DataStream<ReportWalletConsumeRecord> consumeRecord =
dataStreamSource.map(new MapFunction<String, ReportWalletConsumeRecord>() {
@Override
public ReportWalletConsumeRecord map(String value) throws Exception {
Gson gson = new Gson();
ReportWalletConsumeRecord consumeRecord = gson.fromJson(value, ReportWalletConsumeRecord.class);
return consumeRecord;
}
});
if (consumeRecord != null) {
consumeRecord.keyBy("consumeItem")
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("consumeNum")
.addSink(new SinkFunction<ReportWalletConsumeRecord>() {
@Override
public void invoke(ReportWalletConsumeRecord value, SinkFunction.Context context) throws Exception {
Log.info("value" + value);
//publisher.publish(subscribePublish, value, true);
}
});
}
env.execute(StreamingJob.class.getName());
}
}
我应该怎么做才能使它在远程集群中工作?
解决方案
推荐阅读
- python - TypeError:列表索引必须是整数或切片,而不是 str WITH API
- python-3.x - 为什么使用 AWS 完全限定域名时 getaddrinfo 失败并出现 socket.gaierror?
- python - argparse 告诉我一个 store_true arg 突然需要一个参数
- jquery - JQuery 不选择输入值 ASP.Net Core MVC
- python - 如何根据外部参数确定相机位姿
- javascript - 有谁知道为什么我从我的 vue 组件查看本地存储时收到此消息
- delphi - 预期 Blob 获得了 Clob 休息服务
- javascript - 尝试 setState 时未在 componentDidMount 中获取对象
- salesforce - 跨域 UTM 并使用 GTM 发送到 Salesforce
- kotlin - 我怎样才能删除这个未解决的参考?