spring - EmbeddedKafka 和 Spring Cloud Stream 无法正常工作
问题描述
我对卡夫卡很陌生。
我正在尝试为 Kafka Producer 编写集成测试。我在项目中还有 Spring Cloud Stream Function 以及 Avro 序列化器,所以我的生产者看起来像:
@Component
public class EventSender {
private final Sinks.Many<MyAvroObject> processor = Sinks.many().multicast().onBackpressureBuffer();
public void sendEvent(MyObject object) {
MyAvroObject avroMapped = ... // mapping from model to avro object
processor.emitNext(avroMapped, Sinks.EmitFailureHandler.FAIL_FAST);
}
@Bean
public Supplier<Flux<MyAvroObject>> myObjectSupplier() {
return this.processor::asFlux;
}
}
我的测试类看起来像:
@Slf4j
@SpringBootTest(classes = ApiApplication.class,
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers},spring.cloud.function.definition=myObjectSupplier")
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = KafkaSenderTest.INPUT_TOPIC_NAME, bootstrapServersProperty = "spring.kafka.bootstrap-servers")
public class KafkaSenderTest {
static final String INPUT_TOPIC_NAME = "test-topic";
@Autowired
private EventSender eventSender;
private KafkaMessageListenerContainer<String, MyAvroObject> container;
private BlockingQueue<ConsumerRecord<String, String>> consumerRecords;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@BeforeEach
public void setUp() {
consumerRecords = new LinkedBlockingQueue<>();
KafkaTestUtils.producerProps(embeddedKafkaBroker);
ContainerProperties containerProperties = new ContainerProperties(INPUT_TOPIC_NAME);
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(
"qof", "false", embeddedKafkaBroker);
consumerProperties.put("auto.offset.reset", "earliest");
DefaultKafkaConsumerFactory<String, MyAvroObject> consumer = new DefaultKafkaConsumerFactory<>(consumerProperties);
container = new KafkaMessageListenerContainer<>(consumer, containerProperties);
container.setupMessageListener((MessageListener<String, String>) record -> {
log.debug("Listened message='{}'", record);
consumerRecords.add(record);
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterEach
public void tearDown() {
container.stop();
}
@Test
public void test() throws InterruptedException {
MyObject obj = new MyObject(); //then set some field values
eventSender.sendEvent(obj);
ConsumerRecord<String, String> record = consumerRecords.poll(5, TimeUnit.SECONDS);
Assertions.assertThat(record).isNotNull();
}
}
但测试失败,因为没有收到任何消息。我验证了生产者和消费者具有相同的主题和代理地址。看起来 EmbeddedKafka 侦听器没有从供应商那里订阅 Flux。
我有以下依赖项:
testImplementation 'org.springframework.kafka:spring-kafka-test:2.7.6'
testImplementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
testImplementation 'org.springframework.kafka:spring-kafka'
testImplementation "io.confluent:kafka-avro-serializer:${kafkaAvroSerializerVersion}"
testImplementation 'org.springframework.cloud:spring-cloud-stream'
testImplementation 'org.springframework.cloud:spring-cloud-schema-registry-client'
所以我没有spring-cloud-stream-test-support
我知道会导致问题的东西。
在测试中application.properties
我添加了:
spring.cloud.stream.bindings.orderSupplier-out-0.destination=test-topic
由于我的测试类配置,这是多余的,但现在没关系。
我也有主要的 application.yml 配置:
spring:
cloud:
stream:
bindings:
myObjectSupplier-out-0:
destination: ${TOPIC_NAME}
producer:
useNativeEncoding: true
kafka:
binder:
autoCreateTopics: false
brokers: ${KAFKA_URL}
jaas:
loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
options:
username: ${KAFKA_USERNAME}
password: ${KAFKA_PASSWORD}
configuration:
sasl.mechanism: PLAIN
security.protocol: SASL_SSL
ssl.endpoint.identification.algorithm: https
bindings:
myObjectSupplier-out-0:
producer:
configuration:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
basic.auth.credentials.source: USER_INFO
errors.retry.timeout: 3
errors.retry.delay.max.ms: 15000
schema.registry:
url: ${SCHEMA_REGISTRY_URL}
basic.auth.user.info: ${SCHEMA_REGISTRY_API_KEY}:${SCHEMA_REGISTRY_SECRET}
schema-registry-client:
endpoint: ${SCHEMA_REGISTRY_URL}
所以,让我们假设它使用真正的模式注册表。这不利于测试,但现在不是问题。
可能有人可以推荐另一种方法来测试我的制作人吗?我需要一种使用 Spring Cloud Function、Kafka 绑定发送测试消息的方法。
先感谢您。
解决方案
推荐阅读
- c# - Linq 智能感知中缺少 EF 6 实体集
- xamarin.forms - 将 iOS 更新到 15 后列表视图顶部的奇怪空间
- matlab - Matlab Basic拟合线回归误差
- reactjs - 当用户移动到 React 中的另一个页面时,如何使错误消息消失?
- python - 如何根据用户输入使用 python 和烧瓶创建路由?
- go - GroupByKey 始终将所有内容保存在 RAM 中,导致 OOM
- python - For循环正在停止
- java - 在Java中更改实例的属性时如何在其他类中使用它
- postgresql - 使用 DataGrip 在本地运行 Supabase,'relation "public.users" 不存在'
- android - ioUnable to connect with my .Net Server: React Native