首页 > 解决方案 > Spring KafkaEmbedded - 消费消息的问题

问题描述

我在使用来自https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test/2.1.10.RELEASE的 KafkaEmbedded 时遇到问题

我正在使用KafkaEmbedded创建 Kafka 代理来测试生产者/消费者管道。这些生产者/消费者是来自 kafka-clients 的标准客户。我没有使用 Spring Kafka 客户端。

一切正常,代码工作正常,但我必须使用consumeFromEmbeddedTopics()方法 fromKafkaEmbedded让消费者工作。如果我不使用此方法,消费者将不会收到任何消息。

这种方法有两个问题:首先,它需要KafkaConsumer作为参数(我不想在类中公开它),并且ConcurrentModificationException当对象调用 poll 时调用此方法会给出@Scheduled.

我正在使用auto.offset.reset财产,所以这是另一回事。

我的问题是:如何在不调用这些方法的情况下正确使用来自 KafkaEmbedded 的记录consumeFromEmbeddedTopics()

标签: apache-kafkaspring-testspring-kafka

解决方案


该方法没有什么特别之处,它只是将消费者订阅到主题并对其进行轮询。

你没有理由不能对你的消费者做同样的事情。


推荐阅读