首页 > 解决方案 > spring cloud stream kafka binder无法以功能样式消费消息

问题描述

我一直在使用@StreamListener 来消费单个消息,现在我想批量消费。根据文档,@StreamListener 不适用于批处理,因此转向功能样式,但我无法使用该消息。当我运行应用程序时,我没有看到这些 kafka 属性集,就像我看到的其他使用 @Streamlistener 的消费者一样

这是我的代码:

@Component
@Slf4j
@Configuration
@RequiredArgsConstructor
@Profile({"processing"})
public class ActivityBatchSubscriber {

  public final ActivityBatchProcessor activityBatchProcessor;
 

  @Bean
  public Consumer<List<Message<Event>>> nodeConfigEvents() {
  
     return messages -> {
         try {
            log.info("Received activity message with message length {} attempt",
            messages.size());
            Message<Event> eventGenericMessage = messages.get(0);
            Event payload = eventGenericMessage.getPayload();
            log.info("payload" + payload);
             List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
          activityBatchProcessor.processActivity(eventList);

        } catch (Exception e) {
           log.error(e.getMessage());
        }
     };
   }
}

在“处理”配置文件中,具有这些属性。

spring.cloud.stream.function.definition=nodeConfigEvents
spring.cloud.stream.bindings.nodeConfigEvents-in-0.destination=TOPIC-NAME
spring.cloud.stream.bindings.nodeConfigEvents-in-0.contentType=application/json
spring.cloud.stream.bindings.nodeConfigEvents-in-0.consumer.batch-mode=true
spring.cloud.stream.bindings.nodeConfigEvents-in-0.group=LOCAL1
spring.cloud.stream.bindings.nodeConfigEvents-in-0.consumer.auto-startup=true

spring-cloud-stream-kafka 版本---> 3.0.x

我究竟做错了什么?流绑定没有注册这个功能模型,我看到这个问题,有类似的问题,虽然在评论中@Oleg 提到它在 3.0 快照上是固定的,我使用的是 3.0.11。

我的应用程序正在记录此信息,

Functional binding is disabled due to the presense of @EnableBinding annotation in your configuration

我不能在同一个项目中同时使用两者吗?尽管它们都在不同的配置文件中。

@Oleg Zhurakousky @Gary

标签: javaspringapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


推荐阅读