reactive-programming - 主线程上的通量/发布者
问题描述
我是响应式编程/项目反应器的新手,试图理解这些概念。使用 range 方法创建了一个 Flux 并订阅了。当我查看日志时,一切都在主线程上运行。
Flux
.range(1, 5)
.log()
.subscribe(System.out::println);
System.out.println("End of Execution");
[DEBUG] ( main ) 使用控制台日志记录 [INFO] ( main ) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] ( main ) | 请求(无界)[信息](主要)| onNext(1) 1 [ 信息] (主要) | onNext(2) 2 [ 信息] (主要) | onNext(3) 3 [ 信息] (主要) | onNext(4) 4 [ 信息] (主要) | onNext(5) 5 [ 信息] (主要) | onComplete() 执行结束
一旦 Publisher 完成了所有元素的发射,那么只有其余的代码被执行(System.out.println("End of Execution");在上面的例子中)。Publisher会默认阻塞线程吗?如果我更改调度程序,似乎它没有阻塞线程。
Flux
.range(1, 5)
.log()
.subscribeOn(Schedulers.elastic())
.subscribe(System.out::println);
System.out.println("End of Execution");
Thread.sleep(10000);
[DEBUG] (main) 使用控制台日志记录End of Execution [ INFO] ( elastic-2 ) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] ( elastic-2 ) | 请求(无界)[信息](弹性 2)| onNext(1) 1 [ INFO] ( elastic-2 ) | onNext(2) 2 [ INFO] ( elastic-2 ) | onNext(3) 3 [ INFO] ( elastic-2 ) | onNext(4) 4 [ 信息] ( elastic-2 ) | onNext(5) 5 [ INFO] ( elastic-2 ) | 完成()
解决方案
Reactor 默认不强制执行并发模型,是的,许多操作员将在操作发生的Thread
地方继续工作。subscribe()
但这并不意味着使用 Reactor 会阻塞主线程。您展示的示例正在执行内存中的工作,不涉及 I/O 或延迟。此外,它会立即订阅结果。
您可以尝试以下代码段并查看不同的内容:
Flux.range(1, 5)
.delayElements(Duration.ofMillis(100))
.log()
.subscribe(System.out::println);
System.out.println("End of Execution");
在日志中,我看到:
INFO --- [main] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
INFO --- [main] reactor.Flux.ConcatMap.1 : request(unbounded)
End of Execution
在这种情况下,延迟元素将以不同的方式安排工作 - 因为这里没有任何东西可以让 JVM 保持活动状态,所以应用程序退出并且没有消耗该范围内的元素。
在更常见的情况下,将涉及 I/O 和延迟,并且将以适当的方式安排工作,并且不会阻塞主应用程序线程。
推荐阅读
- flutter - 如何在 FutureBuilder 小部件的 builder 方法中获取未来的结果?
- sql - 在 SQL Server 中使用单个案例表达式更新多列
- python - 将数组的项形式索引添加到另一个数组{Python}
- html - 无法在 Web 服务器上加载图像和插件
- javascript - 将数据添加到包含特定数据且位于多个 div 中的 div
- visual-studio - AWS Lambda SQS 触发器
- flutter - Flutter WebChromeClient 等效,使用 openFileChooser 和 onShowFileChooser
- amazon-web-services - 创建请求正文和模板 API GATEWAY CDK
- java - 如何通过其 ID 持久化具有多对一字段的新 JSON 对象?
- istio - Istio 使用企业代理背后的 TLS 发起访问外部站点