java - Beam on Flink runner:ClassNotFoundException:org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector
问题描述
我正在尝试使用 Beam 和 Flink 运行器设置流处理管道。Flink 是具有以下 docker-compose 文件的本地会话部署:
version: "3"
services:
jobmanager:
image: flink:1.12.0-scala_2.12-java11
container_name: flink-jobmanager
environment:
FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
ports:
- 8081:8081
command: jobmanager
networks:
- flink
taskmanager:
image: flink:1.12.0-scala_2.12-java11
container_name: flink-taskmanager
environment:
FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
command: taskmanager
networks:
- flink
networks:
flink:
下面是我的 Beam 应用程序的 pom.xml:
<properties>
<beam.version>2.27.0</beam.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-kafka -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.12</artifactId>
<version>${beam.version}</version>
</dependency>
</dependencies>
我的 Beam 应用程序使用 Beam Direct 运行程序运行得非常好,但是当我尝试使用 Flink 运行程序运行时,我得到了以下异常:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:470)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1122)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:277)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:73)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:69)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1373)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:700)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector
at java.base/java.net.URLClassLoader.findClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Unknown Source)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.util.ArrayList.readObject(Unknown Source)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:467)
... 14 more
JDK版本:Oracle Java 11
执行命令:
mvn exec:java -Dexec.mainClass=vn.duclm.beam.BeamApplication \
-Dexec.args="--runner=FlinkRunner \
--flinkMaster=localhost:8081 \
--filesToStage=target/beam-hello.jar"
我尝试过同步 Flink 和 Beam 的版本,但没有成功。我现在不知道可以做些什么来解决这个问题。提前感谢您的帮助。
解决方案
这看起来是您的 Flink 客户端使用与 Flink 集群不同版本的 Scala 库的问题。我相信 flink-runner 仍然链接到 Scala 2.11 库。尝试修改您pom.xml
的如下:
<properties>
<beam.version>2.27.0</beam.version>
<flink.version>1.12.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.12</artifactId>
<version>${beam.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-stream-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
相反,您可以指定使用 Scala 2.11 的 Flink 容器。
推荐阅读
- angular - ngx-graph 通常不会在 Angular 页面中加载
- python-3.x - Django:将价格注释到行,然后按字段对行进行分组,并将价格总和注释到组
- amazon-web-services - 如何通过用户名和密码在aws lambda函数中获取cognitoIdentityId
- python - 从邻接表创建邻接矩阵
- django - 我怎样才能使 celery 在丢弃任务方面更加健壮?
- javascript - 如何在 TextInput 中按“Enter”键调用函数 - React Native
- ios - 在我继续之前,如何保证 Swift 闭包(引用 Firebase)完全执行?
- c# - 具有递归集合的 C# 克隆类
- python - 在python中逐行解析数据直到EOF
- visual-studio-code - VS Code 复制粘贴格式化文本在 Linux 上不起作用