首页 > 解决方案 > 如何防止我的 Flux 的下游订阅者因任何错误而执行?

问题描述

我目前正在做一个反应堆项目。我们遇到的一种情况需要为 Flux 中的每个元素提供服务,如果其中任何一个获取失败,我们希望整个批处理失败并防止下游事件发生。目前代码大致如下:

getFluxIdsFromDatabase()
  .flatMap(this::fetchFromExternalService)
  .collectList() // prevent downstream execution by having any errors blocked here
  .flatMapMany(Flux::fromIterable) // back to regularly scheduled Flux if no errors
  .concatMap(this::saveSequentiallyToDatabase)

这行得通......但在我看来,我真的应该能够做到这一点,而无需切换到Mono<List>只是为了停止执行错误。当我尝试制作第一行时concatMap,我看到一些元素保存到数据库中,即使在fetchFromExternalService. 那么我可以在这里调用其他一些 Flux 方法来进行映射,从而在出现错误时停止一切,还是我坚持使用这个collectList/flatMapMany(Flux::fromIterable)解决方案?

标签: javaproject-reactor

解决方案


推荐阅读