首页 > 解决方案 > Spring Cloud Stream 中 Kafka 标头上的条件(基于内容)路由

问题描述

我在 Spring Boot 2.x 应用程序中使用 Spring Cloud Stream 3.x 来使用来自 Kafka 主题的消息。

我希望有一个侦听器,根据doc有条件地根据某些自定义标头值使用消息:

@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
  LOGGER.info("Received: {}", message);
}

但是侦听器永远不会收到通知,如果条件被删除,我会在日志中看到以下内容:

Received: ... SomeHeader: [B@1055e4af ...

事实证明,自定义标头以 Kafka 字节数组原始格式保留,使它们不符合条件评估的条件。

是否需要一些额外的配置或者我错过了什么?

标签: springspring-bootapache-kafkaspring-kafkaspring-cloud-stream

解决方案


在对源代码和stackoveflow进行了一些挖掘之后,我发现了以下内容:

所以我添加了我的自定义标头映射器 bean(bean 名称很重要,它允许省略其他配置属性),它将我的自定义标头映射到字符串:

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
    headerMapper.setRawMappedHeaders(Map.of(
        "SomeHeader", true
    ));
    return headerMapper;
}

这解决了问题:

Received: ... SomeHeader: SomeHeaderValue ...

PS这似乎是Spring Cloud Stream中的一个错误:

  1. 它引入了自己的头部映射器(BinderHeaderMapper)实现,但后者不尊重条件路由特性。
  2. 标头映射器是 KafkaMessageChannelBinder 中的子类,此添加的行为不明显,如果提供自定义标头映射器,则会丢失。

推荐阅读