spring - 如何将消息从单个生产者发布到两个主题并在 Kafka 的单个侦听器中使用
问题描述
我正在尝试从单个生产者发布两个不同主题的消息。
在这里,我创建了两个主题:
@Bean
public NewTopic multi1() {
return TopicBuilder.name("multi1").partitions(1).build();
}
@Bean
public NewTopic multi2() {
return TopicBuilder.name("multi2").partitions(1).build();
}
这就是我将消息发送到两个主题的方式:
public void sendingtomultitopic()
{
IntStream.range(0, 100).forEach(i->this.template.send("multi1", "mutli1 data value->"+i));
IntStream.range(0, 100).forEach(i->this.template.send("multi2", "multi2 data value->"+i));
logger.info("sending finished");
}
以上方法是发送到多个主题的正确方法吗?
在下面,我正在尝试使用消息
@KafkaListener(topics = {"multi1,multi2"}, groupId = "diffgroupid3")
public void consumingfrommultitopics(String data) {
logger.info(String.format("consumingfromtwotopics -> %s", data));
}
我收到异常:
org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [multi1,multi2]
我能够在这两个主题中发布数据。
但是这个消费者没有检索任何消息,请在这里帮助我?
解决方案
您可以从配置(application.yml)中使用
test:
kafka:
sender:
serverAddress: xxxxx:port
topic:
consumer: topic1,topic2
@KafkaListener(topics = "#{'${test.kafka.topic}'.split(',')}"
public void consumingfrommultitopics(String data) {
logger.info(String.format("consumingfromtwotopics -> %s", data));
}
推荐阅读
- python - 在 python 脚本上检索 Parameters.scriptPath
- python - 如何遍历元组列表?
- sql - 如何修复 SWITCH 函数中的语法错误
- c# - IQueryable ToListAsync 在使用自定义结构时抱怨违反约束
- regex - SED:为什么替换不能使用 $ 来指示行尾
- json - 在 oracle 中解析 json 数组时获取重复项
- javascript - 如何检测每个视频标签以及它是否正在播放?
- javascript - 无法在 HTML 页面上显示 getJson 的结果
- python - 如何在 excel 文档中搜索特定单词,并使用搜索结果构建新列
- python - 无法使用`loc`向`pandas.DataFrame`子类添加行