apache-kafka - spring cloud stream自定义值反序列化器不起作用
问题描述
我有这个简单的spring cloud stream函数
@Configuration
public class ItemProcessor {
@Bean
public Serde<Wish> WishSerde(){
Serde<Wish> wishSerde = DebeziumSerdes.payloadJson(Wish.class);
wishSerde.configure(Collections.singletonMap("from.field", "after"), false);
return wishSerde;
}
@Bean
public Serde<Long> KeySerde(){
final Serde<Long> keySerde = DebeziumSerdes.payloadJson(Long.class);
keySerde.configure(Collections.emptyMap(), true);
return keySerde;
}
@Bean
public Function<KStream<Long, Wish>, KStream<Long, Wish>> processItems() {
return (models) -> models
.peek((k, v) -> System.out.println(k + ": " + v));
}
}
这是Wish模型
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Wish {
public long wish_id;
public long user_id_fk;
public long item_id_fk;
public long wish_status;
}
和 application.yml
spring.cloud:
function.definition: processItems
stream:
bindings:
processItems-in-0:
destination: source.wish
processItems-out-0:
destination: processed.wish
kafka:
streams:
binder:
brokers: 127.0.0.1:9092
我正在使用文档中描述的 Serde 配置 bean,并使用此处描述的Debezium JsonSerde来反序列化由 Debezium 创建的对象。
传入的消息是这样的:
{"wish_id":759}|{"before":null,"after":{"wish_id":759,"user_id_fk":2,"item_id_fk":823,"wish_status":1},"source":{"version":"1.6.0.Final","connector":"mysql","name":"JDP","ts_ms":1635151905000,"snapshot":"false","db":"jdb","sequence":null,"table":"wish","server_id":1,"gtid":null,"file":"mysql-bin.000008","pos":2694699,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1635151886089,"transaction":null}
其中键和值由 分隔|
。
我需要该'after'
字段的内容用作Wish
模型的数据,并且这个带有此配置的 Serde 假设可以做到这一点。但我在运行时收到以下错误:
Exception in thread "processItems-applicationId-062d6a97-b543-47ea-b938-b1b520a6faa8-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:82)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:895)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:812)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523)
Caused by: java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "before" (class ir.jdro.kafkaStream.rdbAggregator.model.Wish), not marked as ignorable (4 known properties: "wish_status", "wish_id", "user_id_fk", "item_id_fk"])
at [Source: UNKNOWN; line: -1, column: -1] (through reference chain: ir.jdro.kafkaStream.rdbAggregator.model.Wish["before"])
at io.debezium.serde.json.JsonSerde$JsonDeserializer.deserialize(JsonSerde.java:95)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
一旦我设置"unknown.properties.ignored", true
了 Serde 配置并防止应用程序引发异常,我会得到以下输出结果:
759|{"wish_id":0,"user_id_fk":0,"item_id_fk":0,"wish_status":0}
这表明密钥反序列化工作正常,但值反序列化却不行。
我找不到我错在哪里!
解决方案
推荐阅读
- asp.net-core - ADFS 不会将声明提供者的 Ws-Fed 响应中的声明传递给 RP 的传出 SAML 响应
- json - H18 错误,sock=backen。响应中途连接中断
- sql - SQL Server 2012。根据第 x+1 行 - 第 x 行的差异递增一个整数
- kubernetes-helm - Helm (v3) 可以创建 apiVersion 1 图表吗?
- .net - 使用 Asp.net mvc 以编程方式创建 Azure 服务总线
- discord.py-rewrite - 等待 member.remove_roles(成员,角色)
- html - 标签的 margin-bottom 设置在表格中给出了奇怪的对齐方式
- javascript - 哪个更好
我一直在使用带有脚本标签的 defer 属性,但刚刚发现 <link rel="prefetch" as="script" 是一个东西,但找不到更好或首选的东西?这些方法之间有什么区别?
<link rel="prefetch" href="library.js" as="script">
<- javascript - JavaScript 中的 Array.filter() 方法
- amazon-web-services - 从移动应用程序将图像上传到 AWS S3