首页 > 解决方案 > 在 FlinkRunner 上使用 Beam 的 KafkaIO 时如何修复 NotSeraliazableException

问题描述

我正在尝试在 Flink 集群上运行 Apache Beam 应用程序,但它失败,并在翻译 Kafka UnboundedSource 时出错,说[partitions type:ARRAY pos:0] is not serializable. 该应用程序是一个从 Kafka 主题读取并发布到 Kafka 主题的字数统计示例,并且使用 Beam 的直接运行程序可以正常工作。

我按照 Beam 的 QuickStart Java 创建了一个 pom.xml,然后添加了 KafkaIO sdk。我正在运行一个单节点本地 Flink 1.8.1 集群和 Kafka 2.3.0。

pom.xml 片段

    <properties>
      <beam.version>2.14.0</beam.version>
      <flink.artifact.name>beam-runners-flink-1.8</flink.artifact.name>
      <flink.version>1.8.1</flink.version>
    </properties>
...
    <profile>
      <id>flink-runner</id>
      <!-- Makes the FlinkRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <!-- Please see the Flink Runner page for an up-to-date list
               of supported Flink versions and their artifact names:
               https://beam.apache.org/documentation/runners/flink/ -->
          <artifactId>${flink.artifact.name}</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <!-- Tried with and without this flink-avro dependency -->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-avro</artifactId>
          <version>${flink.version}</version>
        </dependency>
      </dependencies>
    </profile>
...
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-kafka</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.3.0</version>
    </dependency>

KafkaWordCount.java 片段

        // Create the Pipeline object with the options we defined above.
        Pipeline p = Pipeline.create(options);

        PCollection<KV<String, Long>> counts = p.apply(KafkaIO.<String, String>read()
                .withBootstrapServers(options.getBootstrapServer())
                .withTopics(Collections.singletonList(options.getInputTopic()))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"latest"))
                .withoutMetadata() // PCollection<KV<Long, String>> instead of KafkaRecord type
        )

完整的错误信息,这是通过提交 Beam jar 到 Flink 的结果/opt/flink/bin/flink run -c org.apache.beam.examples.KafkaWordCount target/word-count-beam-bundled-0.1.jar --runner=FlinkRunner --bootstrapServer=localhost:9092

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Error while translating UnboundedSource: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@65be88ae
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.RuntimeException: Error while translating UnboundedSource: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@65be88ae
        at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:233)
        at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:281)
        at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:157)
        at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:136)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
        at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
        at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:88)
        at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:116)
        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
        at org.apache.beam.examples.KafkaWordCount.runWordCount(KafkaWordCount.java:99)
        at org.apache.beam.examples.KafkaWordCount.main(KafkaWordCount.java:106)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
        ... 9 more
Caused by: org.apache.flink.api.common.InvalidProgramException: [partitions type:ARRAY pos:0] is not serializable. The object probably contains or references non serializable fields.
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:140)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1470)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396)
        at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:218)
        ... 32 more
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
        at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
        at java.base/java.util.ArrayList.writeObject(ArrayList.java:896)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1130)
        at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
        at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
        at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:122)
        ... 42 more

更新

事实证明,Beam 中存在与在 Flink 上运行相关的问题,似乎与此有关:https ://issues.apache.org/jira/browse/BEAM-7478 。其中一条评论特别提到,由于 Avro 的 Schema.Field 不可序列化,因此无法将 flink/run 与 KafkaIO 一起使用:https ://issues.apache.org/jira/browse/BEAM-7478?focusedCommentId=16902419&page =com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16902419

更新 2

正如评论中提到的,一种解决方法是将 Flink 降级到 1.8.0。

标签: javaapache-kafkaapache-flinkapache-beam

解决方案


推荐阅读