首页 > 解决方案 > Spring Reactor:Optional 对应的类是什么?

问题描述

所以我有一个Flux<Foo>并且我想将每个映射FooBaz. 问题是,getBaz(Foo foo)可能会抛出一个IOException.

所以我想有Mono<Baz> getBazRx(Foo foo)一个方法可以返回 aMono.just(baz)Mono.empty()在异常的情况下返回。

然后将最终以Flux<Mono<Baz>>哪种方式提醒Optional<T>容器。

这是在 Spring Reactor 中这样做的方式吗?如何正确食用?

标签: javaspringreactive-programmingproject-reactorreactor

解决方案


在反应式流中,“可选”通常通过从流中删除不存在的元素来处理(例如,一个空的Mono,或Flux删除了元素的 a。),而不是拥有一个Flux<Optional>,,Mono<Optional>Flux<Mono>

调用同步getBaz方法时,可以使用单个.handle操作,如下所示:

flux
    .handle((foo, sink) -> {
        try {
            // propagate Baz down the stream
            sink.next(getBaz(foo));
        } catch (IOException e) {
            // Since sink.next is not called here,
            // the problematic element will be dropped from the stream
            log.error(e);
        }
    })

调用异步getBazRx方法(返回Mono)时,可以在//onErrorResume内部使用,如下所示:flatMapflatMapSequentialconcatMap

flux
    .flatMap(foo -> getBazRx(foo)
        .onErrorResume(t -> {
            log.error(t);
            return Mono.empty();
        }))

(或者您可以移动.onErrorResume到内部.getBazRx,具体取决于您要捕获并忽略异常的位置)

另外,既然你在你的问题中提到了它......如果你要创建getBazRx那个 wraps getBaz,你不应该这样的事情,如果getBaz有可能阻止:

Mono<Baz> getBazRx(Foo foo) {
    // BAD!!!
    try {
        return Mono.just(getBaz(foo));
    } catch (IOException e) {
        return Mono.error(e)  // or Mono.empty() if you want to ignore
    }
}

该实现实际上只是模拟异步方法的同步方法。它有两个问题:

  1. 工作立即完成,而不是在订阅返回后Mono
  2. 如果getBaz阻塞,您最终可能会阻塞事件循环

相反,您应该推迟工作,直到订阅单声道,并在用于阻塞操作的Scheduler目的上运行任何阻塞操作,如下所示:

Mono<Baz> getBazRx(Foo foo) {
    return Mono.fromSupplier(() -> {
            try {
                return getBaz(foo);
            } catch (IOException e) {
                throw Exceptions.propagate(e);  // or return null to ignore and complete empty
            }
        })
        .subscribeOn(Schedulers.elastic());  // run on a scheduler suitable for blocking work
}

推荐阅读