首页 > 解决方案 > 仅在某些条件为真时才使用来自 Kafka 的消息

问题描述

我们有特定的主题,只有在条件 consumeEnabled=true 时才需要消费消息。所以,它应该像这样工作:

  1. 如果应用程序正在启动并且consumeEnabled=true,则将分区分配给消费者并从主题消费消息。
  2. 如果应用程序正在启动并且consumeEnabled=false,则不要将分区分配给消费者并且不要使用来自主题的消息。
  3. 如果应用程序已经以 consumeEnabled=false 运行,但在运行时属性变为 consumeEnabled=true,则在运行时将分区分配给消费者并从主题消费消息。

应用程序正在使用消息,但随后将 consumeEnabled 变为 false 的情况,无需考虑。

请帮助定义使用 Spring Kafka 和/或 Kafka Java 客户端实现决策的最佳方式

标签: javaapache-kafkaspring-kafka

解决方案


如果你正在使用@KafkaListener那么

@KafkaListener(id = "foo", ... , autoStartup="${consume.enabled}")

属性在哪里consume.enabled

要在运行时启动/停止容器,请使用KafkaListenerEndpointRegistrybean。

registry.getListenerContainer("foo").start();

推荐阅读