首页 > 解决方案 > 在 Project Reactor 中对 doOnNext 执行触发并忘记操作

问题描述

我有一个Flux流。对于每个处理的元素,我希望触发一个异步/非阻塞的动作。例如,Mono从数据库更新返回 a 的方法。我希望这个动作在doOnNext块上完成。我不想影响Flux那里实施的处理和背压。

假设Mono要调用的方法是

Mono<Integer> dbUpdate();

Flux应该是这样吗?

public Flux<Data> processData(PollRequest request)
{
    return searchService.search(request)              
                        .doOnNext(data -> dbUpdate(data));
}

或者应该如堆栈溢出示例中所述。

public Flux<Data> processData(PollRequest request)
{
    return searchService.search(request)              
                        .doOnNext(data -> dbUpdate(data).subscribe());
}

以上不会导致内部阻塞问题doOnNext吗?

还有哪个是最适合用于此类操作的调度程序?

标签: javaspringreactive-programmingspring-webfluxproject-reactor

解决方案


dbUpdate()如果您不订阅它将被忽略。以下代码段不会打印任何内容,因为Mono.just("db update")没有被订阅。

Mono<String> dbUpdate() {
    return Mono.just("db update")
        .doOnNext(System.out::println);
}

public Flux<String> processData() {
    return Flux.just("item 1", "item 2")
        .doOnNext(data -> dbUpdate());
}

请注意,.subscribe()这不会阻止您的线程,它会启动工作并立即返回。


推荐阅读