首页 > 解决方案 > 总是在取消信号上完成 Mono.usingWhen

问题描述

我们编写了一个响应式 WebFlux 应用程序,它基本上获取资源,在闭包中完成大量工作,最后使用初始版本或从闭包中更新的版本解锁资源。

前任:

Mono<ProductLock> lock = service.lock()

Mono.usingWhen(lock,
            (ProductLock state) -> service.doLongOperationAsync(state),
            (ProductLock state) -> service.unlock(state))

ProductLock是锁定产品的定义,意味着我们可以通过 HTTP API 进行操作,包括多个微服务。

service.doLongOperationAsync()- 调用一些 HTTP API,预计一旦启动就总是完成(失败 - 是正常的,但如果操作开始 - 它需要完成,因为 HTTP 调用无法回滚)

service.unlock()- 必须仅在 doLongOperationAsync 执行成功或失败后调用操作。

在快乐的情况下,一切都按预期工作:成功解锁产品,失败也解锁。

当调用我们的服务(SOAP UI、POSTMAN、任何真实客户端)的客户端断开连接或超时时,就会出现问题 - 生成取消信号并启动到上面的代码。此时,其中的任何内容service.doLongOperationAsync都会停止并service.unlock异步调用取消。

问题:我们怎样才能防止这种情况发生。要求是:

  1. 一旦doLongOperationAsync开始 - 它必须完成
  2. service.unlock(state)- 必须仅在 之后调用doLongOperationAsync,即使在取消时也是如此。

Spring Boot 复制

MRE:

@Bean
public RouterFunction<ServerResponse> route() {
    return RouterFunctions.route().GET("/work", request -> processRequest()
        .flatMap(res -> ServerResponse.ok().bodyValue(res))).build();
}

private Mono<String> processRequest() {
    //Need unlock to execute exactly after doWorkAsync in any case
    return Mono.usingWhen(lock(), this::doWorkAsync, this::unlock)
        .doOnNext((id) -> System.out.println("Request processed:" + id))
        .doOnCancel(() -> System.out.println("Request cancelled"));
}

private Mono<UUID> lock() {
    return Mono.defer(() -> Mono.just(UUID.randomUUID())
        .doOnNext(id -> System.out.println("Locked:" + id)));
}

//Need this to finish no matter what
private Mono<String> doWorkAsync(UUID lockID) {
    return Mono.just(lockID).map(UUID::toString)
        .doOnNext(id -> System.out.println("Start working on:" + lockID))
        .delayElement(Duration.ofSeconds(10))
        .doOnNext(id -> System.out.println("Finished work on:" + id))
        // Should never be called
        .doOnCancel(() -> System.out.println("Processing cancelled:" + lockID));
}

private Mono<Void> unlock(UUID lockID) {
    return Mono.fromRunnable(() -> System.out.println("Unlocking:" + lockID));
}

标签: spring-webfluxproject-reactorreactor-netty

解决方案


显然,对用例有用的是使用 Mono/Flux create with sink。这样,它在订阅时启动,但即使在 Cancellation 请求时仍不会取消:

Mono.create(sink -> {
   doWorkAsync().subscribe(sink::next, sink::error, null, sink.currentContext())
})

使用时,整体usingWhen应放在里面create


推荐阅读