首页 > 解决方案 > Kafka - 了解所有消费者何时执行寻求最新

问题描述

我在一个消费者组中有多个消费者(单个 Spring Boot 应用程序)。我使用onPartitionsAssigned回调将每个消费者偏移重置为 LATEST(对于分配的分区)。

问题是我需要知道消费者组中的所有消费者何时执行此搜索并在此之后执行一些逻辑。

我目前的理解是,每个消费者都独立执行搜索和开始处理——即没有任何共同的同步点。

如果有人可以提供指导,这是否可能,那就太好了。

更新。让我解释一下为什么需要它。我有一个逻辑来发送 HTTP 请求(到另一个服务)以请求将一些数据提交给 Kafka。但在发送此请求之前,我需要确保所有消费者都已经处于最新的偏移量。因为如果这个请求被提前发送 - 这个第二个服务提交的一些数据可能会丢失 - 即如果它是在消费者完成重置到 LATEST 偏移量之前提交的。

标签: apache-kafkaspring-kafka

解决方案


没有内置任何东西;您可以将 CountDownLatch 设置为容器的并发,并为每个容器倒计时。

如果并发可能发生变化,您可以从KafkaListenerEndpointRegistrybean 获取对并发容器的引用。

int count = ((ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(id))
    .getConcurrency();

推荐阅读