首页 > 解决方案 > 关于 Project Reactor 的 flatMap 中的线程的困惑

问题描述

我正在玩 Project Reactor 和响应式 MongoDB 存储库。我有以下代码:

@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
    @Id
    Integer id;
    String name;
}
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}

和主要@SpringBootApplication课程:

@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {

    private final ReactivePersonRepository reactivePersonRepository;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveDatabaseApplication.class, args);
    }

    @PostConstruct
    public void postConstruct() {
        Scheduler single = Schedulers.newSingle("single-scheduler");
        IntStream.range(0, 10).forEach(i ->
                Flux.just(Person.builder()
                        .id(i)
                        .name("PersonName")
                        .build())
                        .flatMap(personToSave -> {
                            System.out.println(String.format(
                                    "Saving person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.save(personToSave);
                        })
                        //.publishOn(single)
                        .flatMap(savedPerson -> {
                            System.out.println(String.format(
                                    "Finding person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.findById(savedPerson.getId());
                        })
                        //.publishOn(single)
                        .flatMap(foundPerson -> {
                            System.out.println(String.format(
                                    "Deleting person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.deleteById(foundPerson.getId());
                        })
                        //.publishOn(single)
                        .subscribeOn(single)
                        .subscribe(aVoid -> System.out.println(String.format(
                                "Subscription from thread %s", Thread.currentThread().getName()))));
    }
}

Flux::subscribeOn方法描述说:

因此,将这个运算符放在链中的任何位置也会影响 onNext/onError/onComplete 信号的执行上下文,从链的开头到 * 下一次出现 {@link publishOn(Scheduler) publishOn}

这让我有点困惑,因为当我publishOn在处理链中没有指定任何内容时,线程名称的打印值为:

从线程 single-scheduler-1 中救人 - 正如预期的那样

从线程 Thread-13 中查找人员

从线程 Thread-6 中查找人员

从线程 Thread-15 中查找人员

从线程 Thread-6 中删除人员

从线程 Thread-5 中删除人员

从线程 Thread-4 中删除人员

我不明白为什么。每次执行不subscribeOn应该使用方法中指定的调度程序吗?flatMap

当我取消注释时publishOn,一切都由给定的单个调度程序执行,这是预期的。

任何人都可以解释为什么单个调度程序不被flatMap操作使用,当没有时publishOn

标签: javaspringspring-bootspring-webfluxproject-reactor

解决方案


这个人为的例子可能会更清楚:

Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
        .flatMap(x -> {
            System.out.println(String.format(
                    "Saving person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Finding person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Deleting person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .subscribeOn(single)
        .subscribe(aVoid -> System.out.println(String.format(
        "Subscription from thread %s", Thread.currentThread().getName())));

这将给出类似于:

Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4

或者,换句话说,您的反应式存储库没有在同一个调度程序上发布,就是您看到您所做的行为的原因。“直到下一次出现publishOn()”并不意味着您的代码下一次调用publishOn()- 它也可以在您的任何flatMap()调用中的任何发布者中,您将无法控制。


推荐阅读