spring-webflux - 总是在取消信号上完成 Mono.usingWhen
问题描述
我们编写了一个响应式 WebFlux 应用程序,它基本上获取资源,在闭包中完成大量工作,最后使用初始版本或从闭包中更新的版本解锁资源。
前任:
Mono<ProductLock> lock = service.lock()
Mono.usingWhen(lock,
(ProductLock state) -> service.doLongOperationAsync(state),
(ProductLock state) -> service.unlock(state))
ProductLock
是锁定产品的定义,意味着我们可以通过 HTTP API 进行操作,包括多个微服务。
service.doLongOperationAsync()
- 调用一些 HTTP API,预计一旦启动就总是完成(失败 - 是正常的,但如果操作开始 - 它需要完成,因为 HTTP 调用无法回滚)
service.unlock()
- 必须仅在 doLongOperationAsync 执行成功或失败后调用操作。
在快乐的情况下,一切都按预期工作:成功解锁产品,失败也解锁。
当调用我们的服务(SOAP UI、POSTMAN、任何真实客户端)的客户端断开连接或超时时,就会出现问题 - 生成取消信号并启动到上面的代码。此时,其中的任何内容service.doLongOperationAsync
都会停止并service.unlock
异步调用取消。
问题:我们怎样才能防止这种情况发生。要求是:
- 一旦
doLongOperationAsync
开始 - 它必须完成 service.unlock(state)
- 必须仅在 之后调用doLongOperationAsync
,即使在取消时也是如此。
MRE:
@Bean
public RouterFunction<ServerResponse> route() {
return RouterFunctions.route().GET("/work", request -> processRequest()
.flatMap(res -> ServerResponse.ok().bodyValue(res))).build();
}
private Mono<String> processRequest() {
//Need unlock to execute exactly after doWorkAsync in any case
return Mono.usingWhen(lock(), this::doWorkAsync, this::unlock)
.doOnNext((id) -> System.out.println("Request processed:" + id))
.doOnCancel(() -> System.out.println("Request cancelled"));
}
private Mono<UUID> lock() {
return Mono.defer(() -> Mono.just(UUID.randomUUID())
.doOnNext(id -> System.out.println("Locked:" + id)));
}
//Need this to finish no matter what
private Mono<String> doWorkAsync(UUID lockID) {
return Mono.just(lockID).map(UUID::toString)
.doOnNext(id -> System.out.println("Start working on:" + lockID))
.delayElement(Duration.ofSeconds(10))
.doOnNext(id -> System.out.println("Finished work on:" + id))
// Should never be called
.doOnCancel(() -> System.out.println("Processing cancelled:" + lockID));
}
private Mono<Void> unlock(UUID lockID) {
return Mono.fromRunnable(() -> System.out.println("Unlocking:" + lockID));
}
解决方案
显然,对用例有用的是使用 Mono/Flux create with sink。这样,它在订阅时启动,但即使在 Cancellation 请求时仍不会取消:
Mono.create(sink -> {
doWorkAsync().subscribe(sink::next, sink::error, null, sink.currentContext())
})
使用时,整体usingWhen
应放在里面create
推荐阅读
- python - 如何将 csv 读入列表和字典
- typescript - 错误 TS2339:“文件上传”类型上不存在属性“_input”
- javascript - 无法访问响应数据对象
- mysql - Mysql Date Column:为什么有时使用索引键有时不使用?
- c# - 有没有办法在不截屏 C# 的情况下捕获屏幕像素的颜色?
- python - 如何设置副轴的主定位器
- javascript - 如何在JS中找到事件目标的父节点的索引?
- reactjs - 当我注销时,警告:无法对未安装的组件执行 React 状态更新
- webauthn - 在 publicKeyCredentialCreationOptions 中传递用户对象的值(用于 webauthn)
- firebase - 如何获取文档 ID 列表?