首页 > 解决方案 > Sprint Cloud - Kafka Streams Binder - 测试 KafkaStreamsProcessor

问题描述

我一直在尝试为 KafkaStreamsProcessor 编写单元测试。这是代码处理器代码

@EnableBinding(KafkaStreamsProcessor.class)
public class StockProcessor {

    private static final Log LOG = LogFactory.getLog(StockProcessor.class);

    @Autowired
    private EddieClient client;

    @Autowired
    private InventoryRepository inventoryRepository;

    @Autowired
    private PermissionRepository permissionRepository;

    /**
     * Receive message from input queue
     * Apply business logic
     * Send to output queue
     *
     * @param inputMessage the message
     * @return outputMessage
     */
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public KStream<?, OutputMessage> process(KStream<?, InputMessage> inputMessage){
        return inputMessage
            .map((key, value) -> {
                LOG.info("::: processing message...");
               //  ... business logic
                return new KeyValue<>(key, outputMessage);
            });
    }
}

应用程序.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers:
              - ${NX_KAFKA_SERVERS}
      bindings:
        input:
          destination: ${NX_INPUT_TOPIC}
          content-type: application/json
          group: ${NX_PULL_GROUP_ID}
        output:
          destination: ${NX_OUTPUT_TOPIC}
          content-type: application/json
          group: ${NX_PUSH_GROUP_ID}

这是我在单元测试中阅读并尝试做的

public class StockProcessorTest {

    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";

    @SpyBean
    private StockProcessor stockProcessor;

    @MockBean
    private EddieClient client;

    @MockBean
    private InventoryRepository inventoryRepository;

    @MockBean
    private PermissionRepository permissionRepository;

    private TopologyTestDriver topologyTestDriver;
    private TestInputTopic<String, InputMessage> inputTopic;
    private TestOutputTopic<String, OutputMessage> outputTopic;

    private Topology topology;
    private Properties config;

    KStream<String, InputMessage> inputMessageStream;
    KStream<String, OutputMessage> outputMessageStream;

    @Before
    public void setup() {
        config = new Properties();
        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");

        StreamsBuilder streamsBuilder = new StreamsBuilder();
        inputMessageStream = streamsBuilder.stream(INPUT_TOPIC);
        stockProcessor.process(inputMessageStream).to(OUTPUT_TOPIC);

        topology = streamsBuilder.build();
        topologyTestDriver = new TopologyTestDriver(topology, config);

        //???
    }
}

我真的不知道我是否会在这里走上正确的道路。我正在使用杰克逊序列化程序。如何创建 inputTopic 和 outputTopic 并测试我的业务逻辑?

我可以提供任何需要的进一步细节。提前致谢

标签: spring-kafkaspring-cloud-stream

解决方案


以下是一些关于如何基于 Spring Cloud Stream 对 Kafka Streams 应用程序进行单元测试的示例 - https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kafka-streams-samples/kafka -streams-word-count/src/test/java/kafka/streams/word/count/WordCountProcessorApplicationTests.java

此外,这是一个包含一些高级示例的测试套件:https ://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples/kafka-streams-inventory-count /src/test/java/kafka/streams/inventory/count

这些示例详细介绍了如何将 Serdes 与测试驱动程序一起使用。

请检查它们,看看它们是否满足您的测试要求。


推荐阅读