首页 > 解决方案 > WebFlux/Reactor:使用 doOnComplete 检查 Flux 执行前后的条件

问题描述

我已经在查询一些外部资源Flux.using()。现在我想实现一种乐观锁定:在查询开始执行之前读取一些状态并检查它是否在查询完成后更新。如果是这样 - 抛出一些异常来中断 http 请求处理。

我通过使用实现了这一点doOnComplete

final AtomicReference<String> initialState = new AtomicReference<>();

return Flux.just("some", "constant", "data")
    .doOnComplete(() -> initialState.set(getState()))
    .concatWith(Flux.using(...)) //actual data query
    .doOnComplete(() -> {if (!initialState.get().equals(getState())) throw new RuntimeException();})
    .concatWithValues("another", "constant", "data")

我的问题:

  1. 这是对的吗?是否保证第一个doOnCompletelambda 会在之前完成Flux.using(),是否保证第二个doOnCompletelambda 会在之后严格执行?
  2. 是否存在更优雅的解决方案?

标签: reactive-programmingspring-webfluxproject-reactor

解决方案


第一个doOnComplete将在Flux.just("some", "constant", "data")发出所有元素后执行,第二个将在中定义的发出 PublisherconcatWith成功完成后执行。这是有效的,因为两个发布者的元素数量都是有限的。

然而,使用建议的方法,来自特定操作的前置/后置条件在更高级别的操作之外处理。换句话说,属于该操作的条件检查正在泄漏到通量定义。

建议,将条件检查下推到操作:

var otherElements = Flux.using( // actual data query
        () -> "other",
        x -> {
            var initialState = getState();
            return Flux.just(x).doOnComplete(() ->
                { if (!initialState.equals(getState())) throw new IllegalStateException(); }
            );
        },
        x -> { }
);

Flux.just("some", "constant", "data")
        .concatWith(otherElements)
        .concatWith(Mono.just("another")) // "constant", "data" ...


推荐阅读