首页 > 解决方案 > 并发设置为大于1时如何暂停特定的kafka消费者线程?

问题描述

我正在使用 spring-kafka 2.2.8 并将并发设置为 2,如下所示,并试图了解在满足特定条件时如何暂停消费者线程/实例。

@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=2)
    public void listen(String in) {
        System.out.println(in);
    }

现在,我有两个问题。

  1. 我的消费者会跨越两个不同的轮询线程来轮询记录吗?

  2. 如果我为消费者设置一个 id,如上所示。如何暂停特定的消费者线程(并发设置为 1 以上)。

请建议。

标签: spring-kafka

解决方案


使用该KafkaListenerEndpointRegistry.getListenerContainer(id)方法获取对容器的引用。

将其转换为 aConcurrentMessageListenerContainer并调用getContainers()以获取子KafkaMessageListenerContainers 的列表;然后,您可以单独暂停/恢复它们。

您可以使用getAssignedPartitions().


推荐阅读