首页 > 解决方案 > Reactive Spring Cloud Stream 从队列中读取所有数据,而非reactive 则一一读取消息

问题描述

我注意到 Spring Cloud Stream 的反应式和非反应式行为的差异。当我使用非反应性方法时,消息会被一一阅读。这是为流侦听器截取的代码:

@StreamListener(Processor.INPUT)
public void receive2(String input) throws InterruptedException {
    Thread.sleep(2000);
    System.out.println(input.toUpperCase());
}

此代码一一消费消息。我可以从 RabbitMQ 管理站点看到这一点:

在此处输入图像描述

重新启动后,应用程序继续从重新启动前完成的点开始使用消息。但是使用响应式流侦听器服务会立即从队列中获取所有消息。应用程序重新启动后,它不会继续处理消息,因为队列是空的。

这是被剪断的反应式流侦听器:

@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<String> input) {
    input
            .delayElements(Duration.ofSeconds(2))
            .map(String::toUpperCase)
            .doOnEach(System.out::println)
            .subscribe();
}

在此处输入图像描述

我想了解为什么会发生这种情况,是否可以在处理之前的元素之前停止读取整个队列。是我对 Reactor 操作员的错误理解还是预期的行为?是否可以以某种方式指定背压?

标签: springrabbitmqspring-cloudspring-cloud-streamspring-rabbit

解决方案


我们可能需要看看响应式是如何连接的,因为它看起来确实很奇怪。话虽如此,如果您使用的是最新的 spring-cloud-stream,我建议您对您的应用程序稍作更改并依赖 Spring Cloud Function 支持,这对于您的情况基本上为实现反应式消息处理程序提供了替代支持 - https://spring .io/blog/2018/08/28/spring-cloud-stream-fishtown-m2-2-1-0-m2-release-announcement。您应该对博客中的示例进行的唯一更改是:

@Bean
public Consumer<Flux<String>> foo() {
    //
}

推荐阅读