首页 > 解决方案 > 如何将消息从单个生产者发布到两个主题并在 Kafka 的单个侦听器中使用

问题描述

我正在尝试从单个生产者发布两个不同主题的消息。

在这里,我创建了两个主题:

   @Bean
    public NewTopic multi1() {
        return TopicBuilder.name("multi1").partitions(1).build();
    }
    @Bean
    public NewTopic multi2() {
        return TopicBuilder.name("multi2").partitions(1).build();
    }

这就是我将消息发送到两个主题的方式:

public void sendingtomultitopic()
{
    IntStream.range(0, 100).forEach(i->this.template.send("multi1", "mutli1 data value->"+i));      
    IntStream.range(0, 100).forEach(i->this.template.send("multi2", "multi2 data value->"+i));
    logger.info("sending finished");
}

以上方法是发送到多个主题的正确方法吗?

在下面,我正在尝试使用消息

@KafkaListener(topics = {"multi1,multi2"}, groupId = "diffgroupid3")
    public void consumingfrommultitopics(String data) {
        logger.info(String.format("consumingfromtwotopics -> %s", data));
    }

我收到异常:

org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [multi1,multi2]

我能够在这两个主题中发布数据。

但是这个消费者没有检索任何消息,请在这里帮助我?

标签: springspring-bootapache-kafkakafka-consumer-apispring-kafka

解决方案


您可以从配置(application.yml)中使用

test:
  kafka:
    sender:
      serverAddress: xxxxx:port
    topic:
      consumer: topic1,topic2

@KafkaListener(topics = "#{'${test.kafka.topic}'.split(',')}"
  public void consumingfrommultitopics(String data) {
        logger.info(String.format("consumingfromtwotopics -> %s", data));
    }

推荐阅读