首页 > 解决方案 > 我们可以为不同的主题使用相同的容器吗?

问题描述

我对 Kafka 有两个不同的主题。数据(Json)的数据类型和结构在两个主题中是相同的。我可以为这两个主题使用相同的侦听器容器吗?我尝试使用,并且能够从两个主题中获得输出。

下面是我的配置。

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer factoryConfigure,
    ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factoryConfigure.configure(factory, kafkaConsumerFactory);
    factory.setBatchListener(true);
    return factory;
}

这是我的听众。


@KafkaListener(id = "#{'${spring.kafka.listener.id}'}", topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {

    System.out.println("I am from topic1");
    System.out.println(model);

}

@KafkaListener(id = "#{'${spring.kafka.listener.id2}'}", topics = "#{'${spring.kafka.consumer.topic2}'}", groupId = "#{'${spring.kafka.consumer.group-id2}'}")
public void getTopics( List<Request> model) {

System.out.println("I am from topic2");
System.out.println(model);

}

我没有明确使用任何注释来给出容器的名称。这会导致我遇到任何问题吗?

标签: apache-kafkaspring-kafka

解决方案


不清楚你在问什么;容器工厂将为每个侦听器创建一个不同的容器。

topics属性是一个数组,因此您可以使用

@KafkaListener(id = "#{'${spring.kafka.listener.id}'}", 
    topics = { "${spring.kafka.consumer.topic}",
               "${spring.kafka.consumer.topic2}" },
    groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {

    System.out.println("I am from both topics");
    System.out.println(model);

}

如果您需要知道每条记录来自哪个主题,您可以使用

public void getTopics( List<Request> model,
     @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics) {

索引 n 处的主题用于模型索引 n 处的记录。


推荐阅读