首页 > 解决方案 > 如何将组合函数转换为反应式管道?

问题描述

我目前有一个处理管道定义为java.util.function.Functions 的组合,我想在反应性上下文中使用它,比如用Flux#transform. 我应该如何修改pipeline()的签名以使其适合作为参数Flux#transform

Function<A, C> pipeline(UnaryOperator<A> f1,
    Function<A, B> f2_io,
    UnaryOperator<B> f3,
    UnaryOperator<B> f4_io,
    UnaryOperator<B> f5,
    Function<B, C> f6_io) {
  return f1.andThen(f2_io).andThen(f3).andThen(f4_io).andThen(f5).andThen(f6_io);
}

第一次转换尝试可能是仅将返回的类型从更改Function<A, C>Function<Flux<A>, Flux<C>>

Function<Flux<A>, Flux<C>> pipeline(UnaryOperator<A> f1,
    Function<A, B> f2_io,
    UnaryOperator<B> f3,
    UnaryOperator<B> f4_io,
    UnaryOperator<B> f5,
    Function<B, C> f6_io) {
  return f -> f.map(f1).map(f2_io).map(f3).map(f4_io).map(f5).map(f6_io);
}

这里最大的优势是f1..的签名f6不需要更改。如果这些函数都没有进行 I/O 操作会很好,但是,因为它们的后缀提示 ,f2_iof4_io可能f6_io会阻塞 I/O 操作,所以我认为它们应该返回Monos。

如果我们更改Function<A, B> f2_ioFunction<A, Mono<B>> f2_io,我们会Flux<Mono<B>>在第二个之后得到a map()。以下函数的返回类型应该如何改变?我可以避免更改UnaryOperator<B> f3UnaryOperator<Mono<B>> f3吗?

标签: javareactive-programmingproject-reactor

解决方案


map以同步方式转换元素,换句话说,在每个操作上使用映射块,直到评估完成。所以map不是一个适合阻塞 io 的算子。异步flatMap运算符系列将是这里的正确选择。

如果函数本身不返回响应式类型,则始终可以将函数包装在其中Mono以实现异步执行而无需更改函数:

Function<Flux<A>, Flux<C>> pipeline(UnaryOperator<A> f1,
                                            Function<A, B> f2_io,
                                            UnaryOperator<B> f3,
                                            UnaryOperator<B> f4_io,
                                            UnaryOperator<B> f5,
                                            Function<B, C> f6_io) {
    return f -> f.map(f1)
            .flatMap(v -> Mono.fromCallable(() -> f2_io.apply(v)))
            .map(f3)
            .flatMap(v -> Mono.fromCallable(() -> f4_io.apply(v)))
            .map(f5)
            .flatMap(v -> Mono.fromCallable(() -> f6_io.apply(v)));
}

顺便说一句,有一个很棒的工具BlockHound可以检测反应管道中非阻塞调度程序的阻塞调用。


推荐阅读