首页 > 解决方案 > Spring Cloud Kafka Streams 基于 Header 信息的动态消息转换

问题描述

我正在尝试使用 Spring Cloud Kafka Streams 处理来自包含不同类型消息的 Kafka 主题的消息。例如,我们从主题收到一条 JSON 消息,它可以是 A 类消息,也可以是 B 类消息。生产者在标头中添加消息类型,有没有办法在功能绑定器中读取该标头信息并相应地转换消息?或者还有一个“选择”选项,用于在消息进入时进行分支,将消息路由到正确的转换器?

标签: spring-kafkaspring-cloud-streamspring-cloud-function

解决方案


如果将绑定配置为 use nativeDecoding,则反序列化由 Kafka 完成(通过value.deserializerconsumer 属性)。

spring-kafka 提供了一个JsonDeserializer在特定头文件中查找类型信息(由相应的JsonSerializer.

它还提供了一个DelegatingDeserializer允许您根据spring.kafka.serialization.selector标头中的值选择要使用的解串器。

有关更多信息,请参阅Spring for Apache Kafka 参考手册


推荐阅读