java - Spring Reactor:Optional 对应的类是什么?
问题描述
所以我有一个Flux<Foo>
并且我想将每个映射Foo
到Baz
. 问题是,getBaz(Foo foo)
可能会抛出一个IOException
.
所以我想有Mono<Baz> getBazRx(Foo foo)
一个方法可以返回 aMono.just(baz)
或Mono.empty()
在异常的情况下返回。
然后将最终以Flux<Mono<Baz>>
哪种方式提醒Optional<T>
容器。
这是在 Spring Reactor 中这样做的方式吗?如何正确食用?
解决方案
在反应式流中,“可选”通常通过从流中删除不存在的元素来处理(例如,一个空的Mono
,或Flux
删除了元素的 a。),而不是拥有一个Flux<Optional>
,,Mono<Optional>
或Flux<Mono>
调用同步getBaz
方法时,可以使用单个.handle
操作,如下所示:
flux
.handle((foo, sink) -> {
try {
// propagate Baz down the stream
sink.next(getBaz(foo));
} catch (IOException e) {
// Since sink.next is not called here,
// the problematic element will be dropped from the stream
log.error(e);
}
})
调用异步getBazRx
方法(返回Mono
)时,可以在//onErrorResume
内部使用,如下所示:flatMap
flatMapSequential
concatMap
flux
.flatMap(foo -> getBazRx(foo)
.onErrorResume(t -> {
log.error(t);
return Mono.empty();
}))
(或者您可以移动.onErrorResume
到内部.getBazRx
,具体取决于您要捕获并忽略异常的位置)
另外,既然你在你的问题中提到了它......如果你要创建getBazRx
那个 wraps getBaz
,你不应该做这样的事情,如果getBaz
有可能阻止:
Mono<Baz> getBazRx(Foo foo) {
// BAD!!!
try {
return Mono.just(getBaz(foo));
} catch (IOException e) {
return Mono.error(e) // or Mono.empty() if you want to ignore
}
}
该实现实际上只是模拟异步方法的同步方法。它有两个问题:
- 工作立即完成,而不是在订阅返回后
Mono
- 如果
getBaz
阻塞,您最终可能会阻塞事件循环
相反,您应该推迟工作,直到订阅单声道,并在用于阻塞操作的Scheduler
目的上运行任何阻塞操作,如下所示:
Mono<Baz> getBazRx(Foo foo) {
return Mono.fromSupplier(() -> {
try {
return getBaz(foo);
} catch (IOException e) {
throw Exceptions.propagate(e); // or return null to ignore and complete empty
}
})
.subscribeOn(Schedulers.elastic()); // run on a scheduler suitable for blocking work
}
推荐阅读
- flutter - 当用户在颤动中单击其他地方时如何关闭小吃店?
- python-3.x - Python 的 asyncio.gather() 似乎没有异步运行任务
- ios - 如何将文件从缓存移动/复制到用户的文档目录?[迅捷,颤振]
- regex - 使用正则表达式在@之后获取十进制数
- ios - 【UISearchController / UISearchBar】如何为搜索图标添加点击手势?
- discord - Discord bot 看不到成员更新
- angular - 改变垫滑块的起始位置
- python - 从python中的矩阵中提取一列
- flutter - 为什么使用 context.select 代替 context.watch 会导致范围错误?
- solidity - 在测试网上使用 Remix 部署合约时出现问题