首页 > 解决方案 > 无法为多个消费者订阅 Spring Kafka Test 嵌入式 Kafka 代理

问题描述

我试图让两个消费者订阅一个EmbeddedKafkaBroker。第一个成功了,第二个失败了。@EmbeddedKafka和经纪人都@ClassRule失败了。

@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(topics = { "topic" })
public class AnnotationEmbeddedKafkaTest {

    @Autowired
    private EmbeddedKafkaBroker broker;

    @Test
    public void annotationEmbeddedKafkaTest() {
        Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer1 = new KafkaConsumer<>(consumerProps1);
        broker.consumeFromAnEmbeddedTopic(consumer1, "topic");
        System.out.println("consumer1 assignments=" + consumer1.assignment());

        Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer2 = new KafkaConsumer<>(consumerProps2);
        broker.consumeFromAnEmbeddedTopic(consumer2, "topic");
        System.out.println("consumer2 assignments=" + consumer2.assignment());
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ClassRuleEmbeddedKafkaTest {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, false, "topic");

    private EmbeddedKafkaBroker broker = embeddedKafkaRule.getEmbeddedKafka();

    @Test
    public void classRuleEmbeddedKafkaTest() {
        Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer1 = new KafkaConsumer<>(consumerProps1);
        broker.consumeFromAnEmbeddedTopic(consumer1, "topic");
        System.out.println("consumer1 assignments=" + consumer1.assignment());

        Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer2 = new KafkaConsumer<>(consumerProps2);
        broker.consumeFromAnEmbeddedTopic(consumer2, "topic");
        System.out.println("consumer2 assignments=" + consumer2.assignment());
    }
}

我期待两个消费者可以订阅一个EmbeddedKafkaBroker。春季卡夫卡测试有可能吗?

我在这里复制了这个:https ://github.com/yraydhitya/spring-kafka-test-multiple-consumers

标签: springspring-kafkaspring-kafka-test

解决方案


如果您希望两个消费者都接收来自主题的所有消息,您需要他们成为不同消费者组的一部分,例如:

Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded1", "false", broker);

Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded2", "false", broker);

否则,每个消费者将被分配到主题的不同分区,并且由于您的嵌入式 kafka 主题(默认情况下)只有一个分区,因此只会分配一个消费者。


推荐阅读