首页 > 解决方案 > Java 客户端连接本地 Flink 集群失败

问题描述

我正在尝试一个带有本地 Flink 集群的小程序,根据此处的说明进行设置。示例 wordcount 程序运行良好,但是当我尝试运行自己的程序时,它在连接到作业管理器时停止并失败。这是带有 JDK 1.8 的 Flink 1.5

代码的相关部分是

FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setStreaming(true);
options.setFlinkMaster("localhost:6123");
options.setRunner(FlinkRunner.class);

我用 start-cluster.sh 启动集群,我可以看到两个进程(作业和任务管理器)正在运行。Flink 上的日志不多。在客户端,打开调试后,我可以看到以下内容

18:43:20.507 [flink-akka.actor.default-dispatcher-4] INFO akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://flink@talonx:38183] 18:43:20.511 [main] INFO org.apache.flink.client.program.StandaloneClusterClient - Actor system started at akka.tcp://flink@talonx:38183 18:43:20.511 [main] INFO org.apache.flink.client.program.StandaloneClusterClient - Submitting job with JobID: dbf63281771465550fd3598b2b67b91f. Waiting for job completion. Submitting job with JobID: dbf63281771465550fd3598b2b67b91f. Waiting for job completion. 18:43:20.521 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received SubmitJobAndWait(JobGraph(jobId: dbf63281771465550fd3598b2b67b91f)) but there is no connection to a JobManager yet. 18:43:20.522 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received job test-talonx-0618131319-b721a69a (dbf63281771465550fd3598b2b67b91f). 18:43:20.523 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Disconnect from JobManager null.

过了一会儿,我在客户端收到以下异常

19:03:19.396 [main] ERROR org.apache.beam.runners.flink.FlinkRunner - Pipeline execution failed org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:449) at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212) at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176) at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:126) at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:115) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) at Test.main(Test.java:106) Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300) at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481) ... 10 common frames omitted Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager. at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:219) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

这里可能缺少什么?

标签: apache-flinkapache-beamflink-streaming

解决方案


推荐阅读