java - 在反应式 Spring-Webflux 应用程序中使用 Spring AMQP @RabbitListener 时如何触发重试
问题描述
我有一个 spring-webflux 应用程序,它必须使用来自 rabbitMQ 的消息。在以前的应用程序中,当不使用 spring-webflux 时,我能够:
- 声明队列时配置重试策略
- 使用 @RabbitListener 注解设置一个 rabbit 监听器
- 通过在处理函数中抛出异常来触发重试
在 spring-webflux 中我无法抛出错误,我只有一个 MonoError,如何触发重试?
我的代码目前看起来像这样
@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {
private final VehicleService service;
private final OperationFactory operationFactory;
@RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
void handleVehicleUpdated(Message message) {
Mono.just(message)
.map(operationFactory::generateOperationFromMessage)
.flatMap(service::handleOperation) // want to retry if downstream app is down
.subscribe();
}
}
编辑
我现在发现这是可能的。例如,如果客户端代码返回 aMono<Exception>
那么这将触发重试。同样,我可以有条件地触发重试我到Mono<Exception>
. 例如,如果我想在消息中的产品不存在时触发重试,我可以执行以下操作
repository.findByProductId(product.getProductId())
.hasElement()
.filter(exists -> !exists)
.flatMap(missing -> Mono.error(new Exception("my exception")))
.then(...) // carry on if it does exist
解决方案
将反应器与非反应式侦听器容器一起使用具有许多挑战。
- 在反应流程完成后,您必须使用 MANUAL ack 和 ack/nack 交付。
- 您必须使用反应器的重试机制。
考虑查看https://github.com/reactor/reactor-rabbitmq项目而不是 Spring AMQP。在未来的某个时候,我们希望构建响应式@RabbitListener
,但它们还没有。
推荐阅读
- gulp - 如何使用 Gulp 解压缩同一文件夹中的多个文件
- google-sheets - 根据另一行中的单元格匹配值对行中的范围求和
- c# - 如何从 JSON ObservableCollection 中过滤掉项目,使它们不会显示在列表中?
- mule - 最好在 Mulesoft 中使用 WS 操作
- scala - 从 DB 到 MAP 的表格数据 数据结构
- augmented-reality - AR 快速查看“无法打开对象”
- r - 为什么我们要渐近地接近值 0 比值 1 “更多”?
- yocto - 什么是 bitbake 错误-Nothing RPROVIDES mongodb
- python - 我什么时候应该使用 pathlib.Path.mkdir() vs os.mkdir() 或 os.makedirs()?
- css - 如何在Angular Material中修复mat-select-panel的位置