首页 > 解决方案 > 控制 kafka 听众的消费

问题描述

除了之前问过的关于交易的问题,我想问一下控制消费:我有一个监听器,它处理生产数据。现在发生了一些不好的事情,出于任何原因,我们希望我们的应用程序启动,但停止处理记录。所以我想选择手动(开始)停止消费者(我知道 ContainerStoppingErrorHandler)。问题解决后,最终重新定位它们并重新启动它们。

我想我看到了一种方法来做到这一点,但我希望有人可以向我确认这一点,因为可能有很多陷阱。整个过程似乎并不容易,我不确定我是否做对了,也许有更好的方法。

所以首先,为了能够暂停/停止消费者,我必须有权访问 MessageListenerContainer。这意味着,我将在配置中创建:ConcurrentKafkaListenerContainerFactory 并(从 2.2 开始)使用它来创建 ConcurrentMessageListenerContainer 的托管 bean。然后可以使用这个 bean 来启动/停止消费者。作品。一旦它是并发的......我假设我传递给 setupMessageListener 的内容必须是无状态类的实例,以便可以从多个线程/消费者对其进行操作。因此,如果我想进行 spring 依赖注入,就像我之前在带有 @KafkaListener 注释方法的 bean 上所做的那样,我可以在这里传递无状态单例 bean 的实例。

现在关于重新定位:这似乎很容易。只需在通过 setupMessageListener 注册的 messageListener 类中实现 ConsumerSeekAware 并存储回调。然后你可以自动装配 ConsumerSeekAware messageListener 单例,然后进行搜索。相关的 MessageListenerContainer,无论分区/并发设置如何,都应该进行查找。因为当 ConcurrentMessageListenerContainer 以大于 1 的并发开始时,它会启动多个 KafkaMessageListenerContainer a)在所有分区上(?),但由于所有分区共享相同的 groupId,因此只有一个消费者将消费消息,或者 b)每个 KafkaMessageListenerContainer 将有一些分区子集. 但是在 seek 中,我们必须指定 topic+partition+offset,所以在任何一种情况下,seek 都应该被适当的 KafkaMessageListenerContainer 拾取。

对?

我知道这是令人生畏的文本/问题,但也许这对其他人也有用。

标签: spring-kafka

解决方案


您不需要创建自己的容器

@KafkaListener(id = "foo", topics = " ... "

然后,获取对KafkaListenerEndpointRegistry(自动线等)的引用。

然后

registry.getListenerContainer("foo").stop();

或者

registery.stop();

停止所有@KafkaListeners。

当并发> 1时;您至少需要那么多分区,否则一些消费者将处于空闲状态。

分区由 Kafka 分布在消费者之间;每个人都会调用回调,并带有已分配给它的分区列表。


推荐阅读