java - 如何将组合函数转换为反应式管道?
问题描述
我目前有一个处理管道定义为java.util.function.Function
s 的组合,我想在反应性上下文中使用它,比如用Flux#transform
. 我应该如何修改pipeline()
的签名以使其适合作为参数Flux#transform
?
Function<A, C> pipeline(UnaryOperator<A> f1,
Function<A, B> f2_io,
UnaryOperator<B> f3,
UnaryOperator<B> f4_io,
UnaryOperator<B> f5,
Function<B, C> f6_io) {
return f1.andThen(f2_io).andThen(f3).andThen(f4_io).andThen(f5).andThen(f6_io);
}
第一次转换尝试可能是仅将返回的类型从更改Function<A, C>
为Function<Flux<A>, Flux<C>>
:
Function<Flux<A>, Flux<C>> pipeline(UnaryOperator<A> f1,
Function<A, B> f2_io,
UnaryOperator<B> f3,
UnaryOperator<B> f4_io,
UnaryOperator<B> f5,
Function<B, C> f6_io) {
return f -> f.map(f1).map(f2_io).map(f3).map(f4_io).map(f5).map(f6_io);
}
这里最大的优势是f1
..的签名f6
不需要更改。如果这些函数都没有进行 I/O 操作会很好,但是,因为它们的后缀提示 ,f2_io
和f4_io
可能f6_io
会阻塞 I/O 操作,所以我认为它们应该返回Mono
s。
如果我们更改Function<A, B> f2_io
为Function<A, Mono<B>> f2_io
,我们会Flux<Mono<B>>
在第二个之后得到a map()
。以下函数的返回类型应该如何改变?我可以避免更改UnaryOperator<B> f3
为UnaryOperator<Mono<B>> f3
吗?
解决方案
map
以同步方式转换元素,换句话说,在每个操作上使用映射块,直到评估完成。所以map
不是一个适合阻塞 io 的算子。异步flatMap
运算符系列将是这里的正确选择。
如果函数本身不返回响应式类型,则始终可以将函数包装在其中Mono
以实现异步执行而无需更改函数:
Function<Flux<A>, Flux<C>> pipeline(UnaryOperator<A> f1,
Function<A, B> f2_io,
UnaryOperator<B> f3,
UnaryOperator<B> f4_io,
UnaryOperator<B> f5,
Function<B, C> f6_io) {
return f -> f.map(f1)
.flatMap(v -> Mono.fromCallable(() -> f2_io.apply(v)))
.map(f3)
.flatMap(v -> Mono.fromCallable(() -> f4_io.apply(v)))
.map(f5)
.flatMap(v -> Mono.fromCallable(() -> f6_io.apply(v)));
}
顺便说一句,有一个很棒的工具BlockHound可以检测反应管道中非阻塞调度程序的阻塞调用。
推荐阅读
- reactjs - [React]:如果需要,UseRef to scrollIntoView 不同线程的最后一条消息
- sql-server - PARTITION BY 行值
- r - 如何根据一系列值匹配数据
- python - 我是否将 lgbm 正确用于分类特征?
- node.js - 删除文件夹共享点 Typescript 中的用户直接访问权限
- laravel - Laravel:未定义的变量帖子
- java - 在 Java 中将 2D 列表转换为 2D ArrayList
- database - 领域查询顺序是否一致?
- javascript - 尝试使用 Chrome API topSites
- c - C数组的反汇编给出了奇怪的结果