首页 > 解决方案 > spring cloud stream自定义值反序列化器不起作用

问题描述

我有这个简单的spring cloud stream函数

@Configuration
public class ItemProcessor {

    @Bean
    public Serde<Wish> WishSerde(){
        Serde<Wish> wishSerde = DebeziumSerdes.payloadJson(Wish.class);
        wishSerde.configure(Collections.singletonMap("from.field", "after"), false);
        return wishSerde;
    }

    @Bean
    public Serde<Long> KeySerde(){
        final Serde<Long> keySerde = DebeziumSerdes.payloadJson(Long.class);
        keySerde.configure(Collections.emptyMap(), true);
        return keySerde;
    }

    @Bean
    public Function<KStream<Long, Wish>, KStream<Long, Wish>> processItems() {
        return (models) -> models
                .peek((k, v) -> System.out.println(k + ": " + v));
    }
}

这是Wish模型

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Wish {

    public long wish_id;
    public long user_id_fk;
    public long item_id_fk;
    public long wish_status;

}

和 application.yml

spring.cloud:
  function.definition: processItems
  stream:
    bindings:
      processItems-in-0:
        destination: source.wish
      processItems-out-0:
        destination: processed.wish
    kafka:
      streams:
        binder:
          brokers: 127.0.0.1:9092

我正在使用文档中描述的 Serde 配置 bean,并使用此处描述Debezium JsonSerde来反序列化由 Debezium 创建的对象。

传入的消息是这样的:

{"wish_id":759}|{"before":null,"after":{"wish_id":759,"user_id_fk":2,"item_id_fk":823,"wish_status":1},"source":{"version":"1.6.0.Final","connector":"mysql","name":"JDP","ts_ms":1635151905000,"snapshot":"false","db":"jdb","sequence":null,"table":"wish","server_id":1,"gtid":null,"file":"mysql-bin.000008","pos":2694699,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1635151886089,"transaction":null}

其中键和值由 分隔|

我需要该'after'字段的内容用作Wish模型的数据,并且这个带有此配置的 Serde 假设可以做到这一点。但我在运行时收到以下错误:

Exception in thread "processItems-applicationId-062d6a97-b543-47ea-b938-b1b520a6faa8-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:82)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:895)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:812)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523)
Caused by: java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "before" (class ir.jdro.kafkaStream.rdbAggregator.model.Wish), not marked as ignorable (4 known properties: "wish_status", "wish_id", "user_id_fk", "item_id_fk"])
 at [Source: UNKNOWN; line: -1, column: -1] (through reference chain: ir.jdro.kafkaStream.rdbAggregator.model.Wish["before"])
    at io.debezium.serde.json.JsonSerde$JsonDeserializer.deserialize(JsonSerde.java:95)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)

一旦我设置"unknown.properties.ignored", true了 Serde 配置并防止应用程序引发异常,我会得到以下输出结果:

759|{"wish_id":0,"user_id_fk":0,"item_id_fk":0,"wish_status":0}

这表明密钥反序列化工作正常,但值反序列化却不行。

我找不到我错在哪里!

标签: apache-kafkaspring-cloudapache-kafka-streamsspring-cloud-streamdebezium

解决方案


推荐阅读