spring - Spring Cloud Stream 中 Kafka 标头上的条件(基于内容)路由
问题描述
我在 Spring Boot 2.x 应用程序中使用 Spring Cloud Stream 3.x 来使用来自 Kafka 主题的消息。
我希望有一个侦听器,根据doc有条件地根据某些自定义标头值使用消息:
@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
LOGGER.info("Received: {}", message);
}
但是侦听器永远不会收到通知,如果条件被删除,我会在日志中看到以下内容:
Received: ... SomeHeader: [B@1055e4af ...
事实证明,自定义标头以 Kafka 字节数组原始格式保留,使它们不符合条件评估的条件。
是否需要一些额外的配置或者我错过了什么?
解决方案
在对源代码和stackoveflow进行了一些挖掘之后,我发现了以下内容:
- Spring Cloud Stream 委托 Spring Kafka 消息和标头转换(KafkaMessageChannelBinder ~ getHeaderMapper)
- 默认情况下,标题保留为原始格式标题转换实现(BinderHeaderMapper)
- Spring Cloud Stream 允许自定义标头映射,尤其是将标头从字节数组转换为字符串(如何在我的 Spring Cloud Stream 项目中将传入的标头映射为 String 而不是 byte[]?)
所以我添加了我的自定义标头映射器 bean(bean 名称很重要,它允许省略其他配置属性),它将我的自定义标头映射到字符串:
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setRawMappedHeaders(Map.of(
"SomeHeader", true
));
return headerMapper;
}
这解决了问题:
Received: ... SomeHeader: SomeHeaderValue ...
PS这似乎是Spring Cloud Stream中的一个错误:
- 它引入了自己的头部映射器(BinderHeaderMapper)实现,但后者不尊重条件路由特性。
- 标头映射器是 KafkaMessageChannelBinder 中的子类,此添加的行为不明显,如果提供自定义标头映射器,则会丢失。
推荐阅读
- scala - 使用sttp将响应保存为scala中的文件
- c++ - 为什么插入 std::unordered_set 调用复制构造函数?
- c# - JsonIgnore 属性在抽象类中不起作用
- botframework - Bot Framework .dialog 文件和 CodeAction
- web-scraping - page.close() 不会停止 ui4j 活动和网页的定期重新加载
- javascript - JS:如何管理 POST 请求队列
- npm - 猜猜是不是digitalocean中的npm冲突和pm2状态错误
- django - 为什么它 urlpatterns 给我页面错误?
- python - MultiIndex 数据帧的列差异
- c++ - 不一致的操作数约束