spring-kafka - 控制 kafka 听众的消费
问题描述
除了之前问过的关于交易的问题,我想问一下控制消费:我有一个监听器,它处理生产数据。现在发生了一些不好的事情,出于任何原因,我们希望我们的应用程序启动,但停止处理记录。所以我想选择手动(开始)停止消费者(我知道 ContainerStoppingErrorHandler)。问题解决后,最终重新定位它们并重新启动它们。
我想我看到了一种方法来做到这一点,但我希望有人可以向我确认这一点,因为可能有很多陷阱。整个过程似乎并不容易,我不确定我是否做对了,也许有更好的方法。
所以首先,为了能够暂停/停止消费者,我必须有权访问 MessageListenerContainer。这意味着,我将在配置中创建:ConcurrentKafkaListenerContainerFactory 并(从 2.2 开始)使用它来创建 ConcurrentMessageListenerContainer 的托管 bean。然后可以使用这个 bean 来启动/停止消费者。作品。一旦它是并发的......我假设我传递给 setupMessageListener 的内容必须是无状态类的实例,以便可以从多个线程/消费者对其进行操作。因此,如果我想进行 spring 依赖注入,就像我之前在带有 @KafkaListener 注释方法的 bean 上所做的那样,我可以在这里传递无状态单例 bean 的实例。
现在关于重新定位:这似乎很容易。只需在通过 setupMessageListener 注册的 messageListener 类中实现 ConsumerSeekAware 并存储回调。然后你可以自动装配 ConsumerSeekAware messageListener 单例,然后进行搜索。相关的 MessageListenerContainer,无论分区/并发设置如何,都应该进行查找。因为当 ConcurrentMessageListenerContainer 以大于 1 的并发开始时,它会启动多个 KafkaMessageListenerContainer a)在所有分区上(?),但由于所有分区共享相同的 groupId,因此只有一个消费者将消费消息,或者 b)每个 KafkaMessageListenerContainer 将有一些分区子集. 但是在 seek 中,我们必须指定 topic+partition+offset,所以在任何一种情况下,seek 都应该被适当的 KafkaMessageListenerContainer 拾取。
对?
我知道这是令人生畏的文本/问题,但也许这对其他人也有用。
解决方案
您不需要创建自己的容器
@KafkaListener(id = "foo", topics = " ... "
然后,获取对KafkaListenerEndpointRegistry
(自动线等)的引用。
然后
registry.getListenerContainer("foo").stop();
或者
registery.stop();
停止所有@KafkaListener
s。
当并发> 1时;您至少需要那么多分区,否则一些消费者将处于空闲状态。
分区由 Kafka 分布在消费者之间;每个人都会调用回调,并带有已分配给它的分区列表。
推荐阅读
- apache - 使用 wsgi 部署时,绘图不从 sqlite db 加载
- python - Python - 无法匹配列表中的值
- ios - 无法推断通用参数“T” Xcode 11 iOS 13
- javascript - 当 body 函数进行条件检查时,应该从 useEffect 钩子返回什么?
- search - 如何使用rest api颤振进行搜索功能
- node.js - 事件发射器Nodejs的单元测试?
- azure-data-factory - Azure datafactory - 触发窗口翻转被阻止
- azure-cosmosdb - Azure cosmos db 支持阿拉伯语、日语、中文、阿拉伯语、西班牙语、法语 CRUD
- flutter - Flutter - 如何检查应用程序是否有新版本?
- laravel - Laravel - 将数组验证为现有数组和 null