首页 > 解决方案 > 如何使用 Spring Boot 在一个消费者类中顺序读取 2 个 Kafka 主题?

问题描述

我有 2 个 Kafka 主题 A 和 B。我希望每次我的消费者启动时,它都会首先转到主题 A。主题 A 包含有关主题 B 的信息,我将依赖该信息从主题 B 中获取数据。

所以本质上我需要先阅读主题 A,然后再阅读主题 B,我只需要在每次重新启动程序时执行一次。

我能想到的事情如下:

@KafkaListener(topics = {"A" , "B"})

或者:

@KafkaListener(topics = "A")
public void receive() {}

@KafkaListener(topics = "B")
public void receive() {}

两者都不保证阅读的顺序。

如何强制我的程序首先阅读主题 A,并且在完成主题 A 的最新更新后才转到主题 B?

标签: javaspring-bootapache-kafkaspring-kafka

解决方案


使用这样的东西...

@KafkaListener(id = "bReceiver", autoStartup = "false, topics = "B")
public void receive() {}

设置idleEventInterval并添加

@Autowired
KafkaListenerEndpointRegistry registry;

@EventListener(condition = "event.listenerId == 'aReceiver`)
public void eventListener(ListenerContainerIdleEvent event) {
    this.registry.getListenerContainer("aReceiver").stop(() -> { });
    this.registry.getListenerContainer("bReceiver").start();
}

推荐阅读