首页 > 解决方案 > subscribeOn(Schedulers.parallel()) 不工作

问题描述

我正在学习反应堆核心并关注此https://www.baeldung.com/reactor-core

ArrayList<Integer> arrList = new ArrayList<Integer>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .subscribeOn(Schedulers.parallel())
  .subscribe(arrList::add);

System.out.println("After: " + arrList);

当我执行上面的代码行时,给出了。

 Before: []
 [DEBUG] (main) Using Console logging
 After: []

上面的代码行应该在另一个线程中开始执行,但它根本不起作用。有人可以帮我吗?

标签: reactive-programmingspring-webfluxreactive-streams

解决方案


正如 Reactor 文档中提到的各种subscribe方法:

请记住,由于序列可以是异步的,这将立即将控制权返回给调用线程。这可以给人一种印象,例如在主线程或单元测试中执行时没有调用消费者。

这意味着到达 main 方法的末尾,因此主线程在任何线程能够订阅 Reactive 链之前退出,正如 Piotr 所提到的。

您要做的是等到整个链完成后再打印数组的内容。

这样做的天真方法是:

    ArrayList<Integer> arrList = new ArrayList<>();
    System.out.println("Before: " + arrList);
    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(arrList::add)
            .blockLast();

    System.out.println("After: " + arrList);

在这里,您在主线程上阻塞执行,直到处理完 Flux 上的最后一个元素。因此,在您的 ArrayList 完全填充之前,最后一个 System.out 不会执行。

请记住,代码在控制台应用程序中的运行方式与在 Netty 等服务器环境中运行的方式略有不同。使控制台应用程序等待所有订阅启动的唯一方法是block.

但是并行线程上不允许阻塞。所以这种方法在 Netty 环境中是行不通的。在那里,您的服务器将一直运行,直到明确关闭,所以 asubscribe会很好。

但是,在上面的代码片段中,您不仅要阻止应用程序退出,还要在读取已填充的数据之前等待。

对上述代码的改进如下:

    ArrayList<Integer> arrList = new ArrayList<>();
    System.out.println("Before: " + arrList);
    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(arrList::add)
            .doOnComplete(() -> System.out.println("After: " + arrList))
    .blockLast();

即使在这里,也doOnComplete可以从反应链外部访问数据。为了防止这种情况,您将在链本身中收集 Flux 的元素,如下所示:

    System.out.println("Before.");
    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .collectList()
            .doOnSuccess(list -> System.out.println("After: " + list))
    .block();

同样,请记住,在 Netty 中运行时(例如 Spring Webflux 应用程序),上述代码将以subscribe().

但是请注意,从 Flux 切换到 List(或任何 Collection)意味着您正在从响应式范式切换到命令式编程。您应该能够在反应式范式本身内实现任何功能。


推荐阅读