首页 > 解决方案 > ProgramInvocationException: main 方法导致错误: null run flink task in kubernetes cluster

问题描述

我正在向Version: 1.10.0 Commit: aa4eb8f @ 07.02.2020 @ 19:18:19 CET我的 kubernetes 集群(v1.15.2)中的 flink(版本 1.10.0)Web UI()提交作业,任务管理器运行错误如下:

2020-04-04 16:38:02,276 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: null
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
    at org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
    at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
    at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
    at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:87)
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:50)

我本地机器上的相同代码运行良好。这是我的任务代码:

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkUtil.initEnv(env);

        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost("87.102.95.23")
                .setPort(5672)
                .setUserName("admin123")
                .setPassword("123456")
                .setVirtualHost("my_vhost")
                .build();

        RMQSource rmqSource = new RMQSource<>(connectionConfig,
                BizGlobalConstant.WALLET_CONSUME_REPORT_MESSAGE_QUEUE_NAME,
                true,
                new SimpleStringSchema());
        DataStream<String> dataStreamSource = env.addSource(rmqSource);

        DataStream<ReportWalletConsumeRecord> consumeRecord =
                dataStreamSource.map(new MapFunction<String, ReportWalletConsumeRecord>() {
                    @Override
                    public ReportWalletConsumeRecord map(String value) throws Exception {
                        Gson gson = new Gson();
                        ReportWalletConsumeRecord consumeRecord = gson.fromJson(value, ReportWalletConsumeRecord.class);
                        return consumeRecord;
                    }
                });

        if (consumeRecord != null) {
            consumeRecord.keyBy("consumeItem")
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .sum("consumeNum")
                    .addSink(new SinkFunction<ReportWalletConsumeRecord>() {
                        @Override
                        public void invoke(ReportWalletConsumeRecord value, SinkFunction.Context context) throws Exception {
                            Log.info("value" + value);
                            //publisher.publish(subscribePublish, value, true);
                        }
                    });
        }
        env.execute(StreamingJob.class.getName());
    }

}

我应该怎么做才能使它在远程集群中工作?

标签: kubernetes

解决方案


推荐阅读