首页 > 解决方案 > 用于 Json 消息的 Apache Beam Kafka IO - org.apache.kafka.common.errors.SerializationException

问题描述

我正在尝试熟悉 Apache Beam Kafka IO 并遇到以下错误

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
    at com.andrewjones.KafkaConsumerExample.main(KafkaConsumerExample.java:58)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

以下是从 kafka 主题读取消息的一段代码。感谢您是否有人可以提供一些指示。

公共类 KafkaConsumerJsonExample { 静态最终字符串 TOKENIZER_PATTERN = "[^\p{L}]+";

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(options);

    p.apply(KafkaIO.<Long, String>read()
            .withBootstrapServers("localhost:9092")
            .withTopic("myTopic2")
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)

            .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))

            // We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
            // the first 5 records.
            // In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
            .withMaxNumRecords(5)

            .withoutMetadata() // PCollection<KV<Long, String>>
    )
            .apply(Values.<String>create())

            .apply(TextIO.write().to("wordcounts"));
    System.out.println("running data pipeline");
    p.run().waitUntilFinish();
}

}

标签: google-cloud-platformapache-kafkagoogle-cloud-dataflowkafka-consumer-apiapache-beam

解决方案


该问题是由使用LongDeserializer似乎由其他序列化程序而不是 Long 序列化的键引起的,这取决于您如何生成记录。

因此,您可以使用适当的反序列化器,或者,如果密钥无关紧要,作为一种解决方法,也可以尝试使用StringDeserializer密钥。


推荐阅读