首页 > 解决方案 > EmbeddedKafka 和 Spring Cloud Stream 无法正常工作

问题描述

我对卡夫卡很陌生。

我正在尝试为 Kafka Producer 编写集成测试。我在项目中还有 Spring Cloud Stream Function 以及 Avro 序列化器,所以我的生产者看起来像:

@Component
public class EventSender {

  private final Sinks.Many<MyAvroObject> processor = Sinks.many().multicast().onBackpressureBuffer();

  public void sendEvent(MyObject object) {
    MyAvroObject avroMapped = ... // mapping from model to avro object

    processor.emitNext(avroMapped, Sinks.EmitFailureHandler.FAIL_FAST);

  }

  @Bean
  public Supplier<Flux<MyAvroObject>> myObjectSupplier() {
    return this.processor::asFlux;
  }
}

我的测试类看起来像:

@Slf4j
@SpringBootTest(classes = ApiApplication.class,
    properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers},spring.cloud.function.definition=myObjectSupplier")
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = KafkaSenderTest.INPUT_TOPIC_NAME, bootstrapServersProperty = "spring.kafka.bootstrap-servers")
public class KafkaSenderTest {

  static final String INPUT_TOPIC_NAME = "test-topic";

  @Autowired
  private EventSender eventSender;

  private KafkaMessageListenerContainer<String, MyAvroObject> container;

  private BlockingQueue<ConsumerRecord<String, String>> consumerRecords;

  @Autowired
  private EmbeddedKafkaBroker embeddedKafkaBroker;

  @BeforeEach
  public void setUp() {
    consumerRecords = new LinkedBlockingQueue<>();

    KafkaTestUtils.producerProps(embeddedKafkaBroker);

    ContainerProperties containerProperties = new ContainerProperties(INPUT_TOPIC_NAME);

    Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(
        "qof", "false", embeddedKafkaBroker);
    consumerProperties.put("auto.offset.reset", "earliest");

    DefaultKafkaConsumerFactory<String, MyAvroObject> consumer = new DefaultKafkaConsumerFactory<>(consumerProperties);

    container = new KafkaMessageListenerContainer<>(consumer, containerProperties);
    container.setupMessageListener((MessageListener<String, String>) record -> {
      log.debug("Listened message='{}'", record);
      consumerRecords.add(record);
    });
    container.start();

    ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
  }

  @AfterEach
  public void tearDown() {
    container.stop();
  }

  @Test
  public void test() throws InterruptedException {

    MyObject obj = new MyObject(); //then set some field values
    eventSender.sendEvent(obj);
    ConsumerRecord<String, String> record = consumerRecords.poll(5, TimeUnit.SECONDS);
    Assertions.assertThat(record).isNotNull();
  }

}

但测试失败,因为没有收到任何消息。我验证了生产者和消费者具有相同的主题和代理地址。看起来 EmbeddedKafka 侦听器没有从供应商那里订阅 Flux。

我有以下依赖项:

  testImplementation 'org.springframework.kafka:spring-kafka-test:2.7.6'
  testImplementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
  testImplementation 'org.springframework.kafka:spring-kafka'
  testImplementation "io.confluent:kafka-avro-serializer:${kafkaAvroSerializerVersion}"
  testImplementation 'org.springframework.cloud:spring-cloud-stream'
  testImplementation 'org.springframework.cloud:spring-cloud-schema-registry-client'

所以我没有spring-cloud-stream-test-support我知道会导致问题的东西。

在测试中application.properties我添加了:

spring.cloud.stream.bindings.orderSupplier-out-0.destination=test-topic

由于我的测试类配置,这是多余的,但现在没关系。

我也有主要的 application.yml 配置:

spring:
  cloud:
    stream:
      bindings:
        myObjectSupplier-out-0:
          destination: ${TOPIC_NAME}
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          autoCreateTopics: false
          brokers: ${KAFKA_URL}
          jaas:
            loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
            options:
              username: ${KAFKA_USERNAME}
              password: ${KAFKA_PASSWORD}
          configuration:
            sasl.mechanism: PLAIN
            security.protocol: SASL_SSL
            ssl.endpoint.identification.algorithm: https
        bindings:
          myObjectSupplier-out-0:
            producer:
              configuration:
                key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                basic.auth.credentials.source: USER_INFO
                errors.retry.timeout: 3
                errors.retry.delay.max.ms: 15000
                schema.registry:
                  url: ${SCHEMA_REGISTRY_URL}
                  basic.auth.user.info: ${SCHEMA_REGISTRY_API_KEY}:${SCHEMA_REGISTRY_SECRET}
    schema-registry-client:
      endpoint: ${SCHEMA_REGISTRY_URL}

所以,让我们假设它使用真正的模式注册表。这不利于测试,但现在不是问题。

可能有人可以推荐另一种方法来测试我的制作人吗?我需要一种使用 Spring Cloud Function、Kafka 绑定发送测试消息的方法。

先感谢您。

标签: springapache-kafkaapache-kafka-streamsspring-cloud-functionembedded-kafka

解决方案


推荐阅读