spring - 无法为多个消费者订阅 Spring Kafka Test 嵌入式 Kafka 代理
问题描述
我试图让两个消费者订阅一个EmbeddedKafkaBroker
。第一个成功了,第二个失败了。@EmbeddedKafka
和经纪人都@ClassRule
失败了。
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(topics = { "topic" })
public class AnnotationEmbeddedKafkaTest {
@Autowired
private EmbeddedKafkaBroker broker;
@Test
public void annotationEmbeddedKafkaTest() {
Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
Consumer<String, String> consumer1 = new KafkaConsumer<>(consumerProps1);
broker.consumeFromAnEmbeddedTopic(consumer1, "topic");
System.out.println("consumer1 assignments=" + consumer1.assignment());
Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
Consumer<String, String> consumer2 = new KafkaConsumer<>(consumerProps2);
broker.consumeFromAnEmbeddedTopic(consumer2, "topic");
System.out.println("consumer2 assignments=" + consumer2.assignment());
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ClassRuleEmbeddedKafkaTest {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, false, "topic");
private EmbeddedKafkaBroker broker = embeddedKafkaRule.getEmbeddedKafka();
@Test
public void classRuleEmbeddedKafkaTest() {
Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
Consumer<String, String> consumer1 = new KafkaConsumer<>(consumerProps1);
broker.consumeFromAnEmbeddedTopic(consumer1, "topic");
System.out.println("consumer1 assignments=" + consumer1.assignment());
Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
Consumer<String, String> consumer2 = new KafkaConsumer<>(consumerProps2);
broker.consumeFromAnEmbeddedTopic(consumer2, "topic");
System.out.println("consumer2 assignments=" + consumer2.assignment());
}
}
我期待两个消费者可以订阅一个EmbeddedKafkaBroker
。春季卡夫卡测试有可能吗?
我在这里复制了这个:https ://github.com/yraydhitya/spring-kafka-test-multiple-consumers
解决方案
如果您希望两个消费者都接收来自主题的所有消息,您需要他们成为不同消费者组的一部分,例如:
Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded1", "false", broker);
和
Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded2", "false", broker);
否则,每个消费者将被分配到主题的不同分区,并且由于您的嵌入式 kafka 主题(默认情况下)只有一个分区,因此只会分配一个消费者。
推荐阅读
- python - pyFMI Python模拟不同数量的输出点
- python - 如果 set1 的值 = set2 的键,则将一组中的值替换为另一组中的值?
- r - 标签颜色与使用ggrepel的气泡填充相同
- c# - 在 asp.net mvc 中广播日历
- c# - 为什么我的 MySQL 连接在 Visual Studio 代码 C# 中不起作用?
- node.js - 无法从令牌架构令牌 mongodb express 护照中引用
- c - 将 bool 字段添加到结构时出现语法错误(在 '=' 标记之前应为 ':'、','、';'、'}' 或 '__attribute__')
- sql - 我想创建一个按 id 列分区的窗口函数 row_number 并按代码排序,我应该首先获取 E 和接下来的其余记录
- django - 如何在 Django 中显示数据库中倒数第二个对象?
- json - 如何使用 shell 脚本从远程服务器 SSH 读取 .json 文件