首页 > 解决方案 > 在反应式 Spring-Webflux 应用程序中使用 Spring AMQP @RabbitListener 时如何触发重试

问题描述

我有一个 spring-webflux 应用程序,它必须使用来自 rabbitMQ 的消息。在以前的应用程序中,当不使用 spring-webflux 时,我能够:

  1. 声明队列时配置重试策略
  2. 使用 @RabbitListener 注解设置一个 rabbit 监听器
  3. 通过在处理函数中抛出异常来触发重试

在 spring-webflux 中我无法抛出错误,我只有一个 MonoError,如何触发重试?

我的代码目前看起来像这样

@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {

  private final VehicleService service;
  private final OperationFactory operationFactory;

  @RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
  void handleVehicleUpdated(Message message) {
    Mono.just(message)
      .map(operationFactory::generateOperationFromMessage)
      .flatMap(service::handleOperation) // want to retry if downstream app is down
      .subscribe();
  }
}

编辑

我现在发现这是可能的。例如,如果客户端代码返回 aMono<Exception>那么这将触发重试。同样,我可以有条件地触发重试我到Mono<Exception>. 例如,如果我想在消息中的产品不存在时触发重试,我可以执行以下操作

repository.findByProductId(product.getProductId())
        .hasElement()
        .filter(exists -> !exists)
        .flatMap(missing -> Mono.error(new Exception("my exception")))
        .then(...) // carry on if it does exist

标签: javaspringspring-webfluxspring-amqpreactive

解决方案


将反应器与非反应式侦听器容器一起使用具有许多挑战。

  1. 在反应流程完成后,您必须使用 MANUAL ack 和 ack/nack 交付。
  2. 您必须使用反应器的重试机制。

考虑查看https://github.com/reactor/reactor-rabbitmq项目而不是 Spring AMQP。在未来的某个时候,我们希望构建响应式@RabbitListener,但它们还没有。


推荐阅读