reactive-programming - subscribeOn(Schedulers.parallel()) 不工作
问题描述
我正在学习反应堆核心并关注此https://www.baeldung.com/reactor-core
ArrayList<Integer> arrList = new ArrayList<Integer>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(arrList::add);
System.out.println("After: " + arrList);
当我执行上面的代码行时,给出了。
Before: []
[DEBUG] (main) Using Console logging
After: []
上面的代码行应该在另一个线程中开始执行,但它根本不起作用。有人可以帮我吗?
解决方案
正如 Reactor 文档中提到的各种subscribe
方法:
请记住,由于序列可以是异步的,这将立即将控制权返回给调用线程。这可以给人一种印象,例如在主线程或单元测试中执行时没有调用消费者。
这意味着到达 main 方法的末尾,因此主线程在任何线程能够订阅 Reactive 链之前退出,正如 Piotr 所提到的。
您要做的是等到整个链完成后再打印数组的内容。
这样做的天真方法是:
ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.blockLast();
System.out.println("After: " + arrList);
在这里,您在主线程上阻塞执行,直到处理完 Flux 上的最后一个元素。因此,在您的 ArrayList 完全填充之前,最后一个 System.out 不会执行。
请记住,代码在控制台应用程序中的运行方式与在 Netty 等服务器环境中运行的方式略有不同。使控制台应用程序等待所有订阅启动的唯一方法是block
.
但是并行线程上不允许阻塞。所以这种方法在 Netty 环境中是行不通的。在那里,您的服务器将一直运行,直到明确关闭,所以 asubscribe
会很好。
但是,在上面的代码片段中,您不仅要阻止应用程序退出,还要在读取已填充的数据之前等待。
对上述代码的改进如下:
ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.doOnComplete(() -> System.out.println("After: " + arrList))
.blockLast();
即使在这里,也doOnComplete
可以从反应链外部访问数据。为了防止这种情况,您将在链本身中收集 Flux 的元素,如下所示:
System.out.println("Before.");
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.collectList()
.doOnSuccess(list -> System.out.println("After: " + list))
.block();
同样,请记住,在 Netty 中运行时(例如 Spring Webflux 应用程序),上述代码将以subscribe()
.
但是请注意,从 Flux 切换到 List(或任何 Collection)意味着您正在从响应式范式切换到命令式编程。您应该能够在反应式范式本身内实现任何功能。
推荐阅读
- angular - 无法在 Ionic4 中检查我的滑块是否已到达最后一张幻灯片
- sql - 如何删除所有相关记录?
- javascript - 为 URL 设置社交元数据
- javascript - 使用加密密码获取
- memory - 如何在堆栈上分配 gsl_vector?
- xcode - 无法将“新组”重命名为“我的组”。在 Xcode10.2
- regex - 正则表达式(ruby)删除一组字符的所有实例,除非它们位于字符串的开头
- python - /password-reset/ 处的 SMTPAuthenticationError
- jenkins - 如何在我的 Jenkins 构建描述中包含自定义变量?
- python - 在数据坐标中绘制具有恒定宽度的任意路径