java - 在 FlinkRunner 上使用 Beam 的 KafkaIO 时如何修复 NotSeraliazableException
问题描述
我正在尝试在 Flink 集群上运行 Apache Beam 应用程序,但它失败,并在翻译 Kafka UnboundedSource 时出错,说[partitions type:ARRAY pos:0] is not serializable
. 该应用程序是一个从 Kafka 主题读取并发布到 Kafka 主题的字数统计示例,并且使用 Beam 的直接运行程序可以正常工作。
我按照 Beam 的 QuickStart Java 创建了一个 pom.xml,然后添加了 KafkaIO sdk。我正在运行一个单节点本地 Flink 1.8.1 集群和 Kafka 2.3.0。
pom.xml 片段
<properties>
<beam.version>2.14.0</beam.version>
<flink.artifact.name>beam-runners-flink-1.8</flink.artifact.name>
<flink.version>1.8.1</flink.version>
</properties>
...
<profile>
<id>flink-runner</id>
<!-- Makes the FlinkRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<!-- Please see the Flink Runner page for an up-to-date list
of supported Flink versions and their artifact names:
https://beam.apache.org/documentation/runners/flink/ -->
<artifactId>${flink.artifact.name}</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<!-- Tried with and without this flink-avro dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</profile>
...
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
KafkaWordCount.java 片段
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
PCollection<KV<String, Long>> counts = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrapServer())
.withTopics(Collections.singletonList(options.getInputTopic()))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"latest"))
.withoutMetadata() // PCollection<KV<Long, String>> instead of KafkaRecord type
)
完整的错误信息,这是通过提交 Beam jar 到 Flink 的结果/opt/flink/bin/flink run -c org.apache.beam.examples.KafkaWordCount target/word-count-beam-bundled-0.1.jar --runner=FlinkRunner --bootstrapServer=localhost:9092
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Error while translating UnboundedSource: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@65be88ae
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.RuntimeException: Error while translating UnboundedSource: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@65be88ae
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:233)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:281)
at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:157)
at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:136)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:88)
at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:116)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at org.apache.beam.examples.KafkaWordCount.runWordCount(KafkaWordCount.java:99)
at org.apache.beam.examples.KafkaWordCount.main(KafkaWordCount.java:106)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more
Caused by: org.apache.flink.api.common.InvalidProgramException: [partitions type:ARRAY pos:0] is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:140)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1470)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:218)
... 32 more
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at java.base/java.util.ArrayList.writeObject(ArrayList.java:896)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1130)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:122)
... 42 more
更新
事实证明,Beam 中存在与在 Flink 上运行相关的问题,似乎与此有关:https ://issues.apache.org/jira/browse/BEAM-7478 。其中一条评论特别提到,由于 Avro 的 Schema.Field 不可序列化,因此无法将 flink/run 与 KafkaIO 一起使用:https ://issues.apache.org/jira/browse/BEAM-7478?focusedCommentId=16902419&page =com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16902419
更新 2
正如评论中提到的,一种解决方法是将 Flink 降级到 1.8.0。
解决方案
推荐阅读
- c# - c# , Regex ,textblock 只接受数字和 , '+' , '-'
- http - 保持活动请求 _change 连续供稿
- javascript - HTMLImageElement onclick
- rust - 在循环中将两个元素添加到向量的函数的功能等效项
- c - 像 linux 命令一样的 C 行对齐
- c++ - 来自三个 XYZ 向量的 vtkStructuredGrid 的 VTK C++ 设置点
- react-router - 如何在反应路由器中加载进入下一个路由之前避免闪烁?
- python - Python:检查最后一个字符串的函数
- android - Android ClassCastException同时在另一个片段中实现TimePickerDialog.OnTimeSetListener接口的onTimeSet方法
- javascript - 如何在javascript函数中为参数的特定默认值设置值