首页 > 解决方案 > 主线程上的通量/发布者

问题描述

我是响应式编程/项目反应器的新手,试图理解这些概念。使用 range 方法创建了一个 Flux 并订阅了。当我查看日志时,一切都在主线程上运行。

     Flux
        .range(1, 5)
        .log()
        .subscribe(System.out::println);

    System.out.println("End of Execution");

[DEBUG] ( main ) 使用控制台日志记录 [INFO] ( main ) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] ( main ) | 请求(无界)[信息](主要)| onNext(1) 1 [ 信息] (主要) | onNext(2) 2 [ 信息] (主要) | onNext(3) 3 [ 信息] (主要) | onNext(4) 4 [ 信息] (主要) | onNext(5) 5 [ 信息] (主要) | onComplete() 执行结束

一旦 Publisher 完成了所有元素的发射,那么只有其余的代码被执行(System.out.println("End of Execution");在上面的例子中)。Publisher会默认阻塞线程吗?如果我更改调度程序,似乎它没有阻塞线程。

Flux
        .range(1, 5)
        .log()
        .subscribeOn(Schedulers.elastic())
        .subscribe(System.out::println);
    System.out.println("End of Execution");
    Thread.sleep(10000);

[DEBUG] (main) 使用控制台日志记录End of Execution [ INFO] ( elastic-2 ) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] ( elastic-2 ) | 请求(无界)[信息](弹性 2)| onNext(1) 1 [ INFO] ( elastic-2 ) | onNext(2) 2 [ INFO] ( elastic-2 ) | onNext(3) 3 [ INFO] ( elastic-2 ) | onNext(4) 4 [ 信息] ( elastic-2 ) | onNext(5) 5 [ INFO] ( elastic-2 ) | 完成()

标签: reactive-programmingspring-webfluxproject-reactor

解决方案


Reactor 默认不强制执行并发模型,是的,许多操作员将在操作发生的Thread地方继续工作。subscribe()

但这并不意味着使用 Reactor 会阻塞主线程。您展示的示例正在执行内存中的工作,不涉及 I/O 或延迟。此外,它会立即订阅结果。

您可以尝试以下代码段并查看不同的内容:

Flux.range(1, 5)
    .delayElements(Duration.ofMillis(100))
    .log()
    .subscribe(System.out::println);
System.out.println("End of Execution");

在日志中,我看到:

INFO   --- [main] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
INFO   --- [main] reactor.Flux.ConcatMap.1 : request(unbounded)
End of Execution

在这种情况下,延迟元素将以不同的方式安排工作 - 因为这里没有任何东西可以让 JVM 保持活动状态,所以应用程序退出并且没有消耗该范围内的元素。

在更常见的情况下,将涉及 I/O 和延迟,并且将以适当的方式安排工作,并且不会阻塞主应用程序线程。


推荐阅读