google-cloud-platform - 用于 Json 消息的 Apache Beam Kafka IO - org.apache.kafka.common.errors.SerializationException
问题描述
我正在尝试熟悉 Apache Beam Kafka IO 并遇到以下错误
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at com.andrewjones.KafkaConsumerExample.main(KafkaConsumerExample.java:58)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
以下是从 kafka 主题读取消息的一段代码。感谢您是否有人可以提供一些指示。
公共类 KafkaConsumerJsonExample { 静态最终字符串 TOKENIZER_PATTERN = "[^\p{L}]+";
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("myTopic2")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 5 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(5)
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("wordcounts"));
System.out.println("running data pipeline");
p.run().waitUntilFinish();
}
}
解决方案
该问题是由使用LongDeserializer
似乎由其他序列化程序而不是 Long 序列化的键引起的,这取决于您如何生成记录。
因此,您可以使用适当的反序列化器,或者,如果密钥无关紧要,作为一种解决方法,也可以尝试使用StringDeserializer
密钥。
推荐阅读
- dask - 如何减少 `dask_ml.xgboost` worker 内存消耗?
- java - 如何使用流 lambda 从对象集合的重复字段中获取聚合对象列表
- javascript - props 在 reactjs 中没有正确的数据
- networking - Pod 与外部世界的 IP 地址重叠问题
- c - gstreamer 1.14.5 multiple rtspsrc element pipeline, reconnect individual streams when disconnected via 'C' code
- python - 从python字典中提取id作为列表
- android-studio - How To Dynamically Query For Tables Enteries In Database Using Room
- python - Parsing file object XML with lxml returns external entity error
- mysql - @Get 控制器和布尔查询中的 NestJS 验证失败(需要数字字符串)
- angular - Why does the selected attribute work for each ng-container?