首页 > 解决方案 > 使用 Flux 持续减少状态

问题描述

假设我有两种事件类型 ( Aand B) 和Fluxes 以某种方式生成它们:

Flux<A> aFlux = ...;
Flux<B> bFlux = ...;

还有一个保存当前状态的类型,由 type 表示S

class S {
  final int val;
}

我想创建以下内容:

final S sInitial = ...;

Flux<S> sFlux = Flux.merge(aFlux, bFlux)
  .scan((a, e) -> {
    if(e instanceof A) {
      return mapA(a, (A)e);
    } else if(e instanceof B) {
      return mapB(a, (B)e);
    } else {
      throw new RuntimeException("invalid event");
    }
  })
  .startWith(sInitial);

sCurrsFlux 上次输出的实例在哪里S,以sInitialmapA/mapB返回 type 的新值开始SS和都是sInitial不可变的。

也就是说,我想:

有没有办法以其他方式重组上述流,尤其是为了避免使用instanceof

标签: javaproject-reactor

解决方案


您可以添加接口并为您的 A 和 B 类实现它

interface ToSConvertible {
    S toS(S s);
}

现在你可以使用reactor.core.publisher.Flux#scan(A, java.util.function.BiFunction<A,? super T,A>)方法:

Flux<S> sFlux = Flux.merge(aFlux, bFlux)
        .scan(sInitial, (s, e) -> e.toS(s));

推荐阅读