首页 > 解决方案 > 如何一起对 Kafka Streams 和 Producer API 进行单元测试

问题描述

目前,我有一个基本的 Kafka 流应用程序,它涉及一个只有源和处理器但没有接收器的拓扑。本质上,Topology 只处理消息的消费。至于产生消息,我们在传递给拓扑的 ProcessorSupplier 实例中调用 Producer API,特别是在被覆盖的process方法中。虽然我知道 Producer API 在这里是多余的,因为我可以简单地在拓扑中添加一个接收器,但我现在必须以这种方式设置我的流应用程序。至于测试,我尝试了kafka-streams-test-utilsTopologyTestDriver中可用的类。但是,我不仅要测试拓扑,还要测试对 Producer API 的调用。使用要求我模拟我的TopologyTestDriverProducer实例,因为它与 Streams API 是分开的。结果,由于消息没有“转发”,我无法从TopologyTestDriver单元测试中读取消息。

这是我的process方法的简化版本:

@Override
public void process(String key, String value) {
    // some data processing stuff that I leave out for simplicity sake
    String topic = "...";
    Properties props = ...;
    //Producer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord(topic, key, value);
    producer.send(record);
}

这是我的示例单元测试的简化:

@Test
public void process() {
    Topology topology = new Topology();
    topology.addSource("source", "input-topic");
    topology.addProcessor("processor", ..., "source");
    Properties props = ...;

    TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

    ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
    // the following line will work fine as long as the producer is mocked
    testDriver.pipeInput(factory.create("input-topic", "key", "value"));

    // since the producer is mocked, no message can be read from the output topic
    ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());

    assertNull(outputRecord); // returns true
}

总结一下我的问题,有没有办法编写一个单元测试来测试拓扑中消息的消费和生产,该拓扑使用生产者 API 将消息写入传出主题?

标签: javajunitapache-kafkaapache-kafka-streamskafka-producer-api

解决方案


您不应该使用自定义Producer,而是将接收器添加到您的Topology. 调用Producer.send()是异步的,因此您可能会丢失数据。为避免数据丢失,您需要使调用同步,即获取Future返回的send()对象并在返回之前等待其完成process()。但是,这对您的吞吐量有很大影响,不建议这样做。

如果添加接收器,则可以避免这些问题,因为 Kafka Streams 现在将了解哪些数据已发送到输出主题,因此不会发生数据丢失,而 Kafka Streams 可以使用性能更高的异步调用。

除了正确性问题之外,您似乎KafkaProducer为当前代码中处理的每条消息都创建了一个新消息,这是非常低效的。此外,使用接收器将简化您的代码。当然,您还可以使用TopologyTestDriver.


推荐阅读