首页 > 解决方案 > 单元测试 Spring Cloud Stream Producer-Processor-Consumer 场景

问题描述

我使用 Spring Cloud Scenario 为生产者-处理器-消费者场景创建了一个示例应用程序。在这里,我使用了基于传统注释的方法。

在单元测试中,我想测试一个简单的场景,即生成一条消息并在它经过转换后断言消费的消息。但我没有在消费者绑定端收到消息。请让我知道这里可能缺少什么。

生产者.java

@EnableBinding(MyProcessor.class)
public class Producer {

    @Bean
    @InboundChannelAdapter(value = MyProcessor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public MessageSource<String> produceMessage() {
        return () -> new GenericMessage<>("Hello Spring Cloud World >>> " + Instant.now());
    }
}

变换处理器.java

@EnableBinding(MyProcessor.class)
public class TransformProcessor {

    @Transformer(inputChannel = MyProcessor.OUTPUT, outputChannel = MyProcessor.INPUT)
    public String transform(String message) {
        System.out.println("Transforming the message: " + message);
        return message.toUpperCase();
    }
}

消费者.java

@EnableBinding(MyProcessor.class)
public class Consumer {

    @StreamListener(MyProcessor.INPUT)
    public void consume(String message) {
        System.out.println("Consuming transformed message: " + message);
    }
}

我的处理器.java

public interface MyProcessor {
    String INPUT = "my-input";
    final static String OUTPUT = "my-output";

    @Input(INPUT)
    SubscribableChannel anInput();

    @Output(OUTPUT)
    MessageChannel anOutput();
}

SpringCloudStreamLegacyApplicationTests.java

@SpringBootTest
class SpringCloudStreamLegacyApplicationTests {

    @Autowired
    private MyProcessor myProcessor;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void testConsumer() {

        myProcessor.anOutput().send(new GenericMessage<byte[]>("hello".getBytes()));

        Message<?> poll = messageCollector.forChannel(myProcessor.anInput()).poll();
        System.out.println("Received: " + poll.getPayload());
    }

}


在这里,我希望在handleMessage方法中收到一条消息。

请注意,我正在使用以下依赖项进行测试:


        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>

标签: javaspringspring-cloud-stream

解决方案


推荐阅读