spring-kafka - 并发设置为大于1时如何暂停特定的kafka消费者线程?
问题描述
我正在使用 spring-kafka 2.2.8 并将并发设置为 2,如下所示,并试图了解在满足特定条件时如何暂停消费者线程/实例。
@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=2)
public void listen(String in) {
System.out.println(in);
}
现在,我有两个问题。
我的消费者会跨越两个不同的轮询线程来轮询记录吗?
如果我为消费者设置一个 id,如上所示。如何暂停特定的消费者线程(并发设置为 1 以上)。
请建议。
解决方案
使用该KafkaListenerEndpointRegistry.getListenerContainer(id)
方法获取对容器的引用。
将其转换为 aConcurrentMessageListenerContainer
并调用getContainers()
以获取子KafkaMessageListenerContainer
s 的列表;然后,您可以单独暂停/恢复它们。
您可以使用getAssignedPartitions()
.
推荐阅读
- python - 设置 docker-compose 端口和主机名以使用 kafka-python 容器
- powershell - 如何在 PowerShell 中使用 ESXCLI 更新 ESXi 6.5 补丁
- c++ - 我试图从另一个函数抛出异常,但它崩溃并给了我未处理的异常。我应该怎么做?C++
- python - 从每个类别的某个布尔列(从最后一个)的最后一次出现计算时间增量?
- extjs - Extjs 6.2.0。如何使用锁定列更改网格中的标题文本
- javascript - 关于消息错误 bot.on('message', async (msg) => { if (msg.author.bot) return; await msg.replay() });
- c - 用户输入字符串后如何检查特定字符?
- c# - 这里使用什么类型的构造函数/方法?
- python - 使用 factory_boy 自定义创建相关子工厂
- php - Composer 不索引类