java - 关于 Project Reactor 的 flatMap 中的线程的困惑
问题描述
我正在玩 Project Reactor 和响应式 MongoDB 存储库。我有以下代码:
@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
@Id
Integer id;
String name;
}
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}
和主要@SpringBootApplication
课程:
@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {
private final ReactivePersonRepository reactivePersonRepository;
public static void main(String[] args) {
SpringApplication.run(ReactiveDatabaseApplication.class, args);
}
@PostConstruct
public void postConstruct() {
Scheduler single = Schedulers.newSingle("single-scheduler");
IntStream.range(0, 10).forEach(i ->
Flux.just(Person.builder()
.id(i)
.name("PersonName")
.build())
.flatMap(personToSave -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.save(personToSave);
})
//.publishOn(single)
.flatMap(savedPerson -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.findById(savedPerson.getId());
})
//.publishOn(single)
.flatMap(foundPerson -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.deleteById(foundPerson.getId());
})
//.publishOn(single)
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName()))));
}
}
Flux::subscribeOn
方法描述说:
因此,将这个运算符放在链中的任何位置也会影响 onNext/onError/onComplete 信号的执行上下文,从链的开头到 * 下一次出现 {@link publishOn(Scheduler) publishOn}
这让我有点困惑,因为当我publishOn
在处理链中没有指定任何内容时,线程名称的打印值为:
从线程 single-scheduler-1 中救人 - 正如预期的那样
从线程 Thread-13 中查找人员
从线程 Thread-6 中查找人员
从线程 Thread-15 中查找人员
从线程 Thread-6 中删除人员
从线程 Thread-5 中删除人员
从线程 Thread-4 中删除人员
我不明白为什么。每次执行不subscribeOn
应该使用方法中指定的调度程序吗?flatMap
当我取消注释时publishOn
,一切都由给定的单个调度程序执行,这是预期的。
任何人都可以解释为什么单个调度程序不被flatMap
操作使用,当没有时publishOn
?
解决方案
这个人为的例子可能会更清楚:
Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
.flatMap(x -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName())));
这将给出类似于:
Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4
或者,换句话说,您的反应式存储库没有在同一个调度程序上发布,这就是您看到您所做的行为的原因。“直到下一次出现publishOn()
”并不意味着您的代码下一次调用publishOn()
- 它也可以在您的任何flatMap()
调用中的任何发布者中,您将无法控制。
推荐阅读
- kubernetes - 有没有办法对两个依赖部署执行滚动更新?
- javascript - 从 JSON 比较中排除丢失的键
- python - 解决问题的不同方法的复杂性
- javascript - 如何禁用选择器(Bootstrap-Select)的实际“下拉”?
- swift - iOS13 UIAlertController 具有自定义视图和首选样式作为操作表灰度所有颜色
- python - 寻找工资帽的空间复杂度
- python - 如何在不删除瓷砖的情况下在 folium 地图上创建单选按钮?
- javascript - 从特定 DOM 部分搜索数据
- makefile - 在多个目录上并行执行
- python-3.x - 如何序列化列表中的转义字符串