首页 > 解决方案 > 继续通过flux.fromStream进行迭代,即使在其中一个元素抛出异常之后

问题描述

  public Mono<ResponseEntity<Data>> getData(@RequestParam List<String> tagIds){
   
   Flux<S3Object> s3ObjectFlux = Flux.fromStream(tagIds.stream())
           .parallel()
           .runOn(Schedulers.boundedElastic())
           .flatMap(id -> fetchResources(id)) //S3Exception is thrown here for one of the element 
       .sequential() 
    //here what should I use to catch the exception, log and continue 
           .ordered((u1, u2) -> u2.hashCode() - u1.hashCode());

   Mono<Data> data = s3ObjectFlux.collectList()
           .map(s3Objects -> new Data(s3Objects));  
}

我正在遍历 tagIds 并在此处获取 s3 对象,如果该对象不存在或引发任何异常,我想记录,忽略它,然后继续下一步。所有 onError* 我都试过了,它只是打破了迭代,最后我得到了空列表

标签: javareactiveflux

解决方案


推荐阅读