首页 > 解决方案 > Mono.fromCallable 线程行为

问题描述

我试图了解 Reactor 中阻塞函数的行为,但其他一些事情完全让我脱离了研究。这是代码:

  public static void main(String[] args) throws InterruptedException {
    Flux.range(1, 100_000)
        .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
        .flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic()))
        .subscribe();
    System.out.println("Here");
    Thread.sleep(Integer.MAX_VALUE);
  }

  private static int blockingMethod(int s) {
    try {
      Thread.sleep(100_000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return s;
  }

这是AFAIK发生的事情的摘要:

  1. 订阅发生在main线程上。

  2. main在内部变得自由flatMap以从上游带来下一个元素。因此,doOnNext应始终打印main.

  3. 处理 100_000 个元素后,main将变为 free 并 print here

相反,这就是发生的事情:

  1. 前 256 个元素按预期打印在main(in ) 上。doOnNext

  2. 大约 1 秒后,接下来是 256,然后是下一个,依此类推。从第二批开始的元素被打印在elastic线程上。

以下是我的问题:

  1. 为什么要批量处理 256 个元素?Schedulers.elastic()应该按需创建线程,因此理想情况下应该始终有一个线程可用于从 main 获取请求(忽略 JVM 对我可以创建的线程数量的限制)。

  2. 为什么将第二批(及以后)中的元素打印在elastic线程上?我希望它们能在main. 事实上,当您删除阻塞调用时会发生这种情况

       public static void main(String[] args) throws InterruptedException {
           Flux.range(1, 100_000)
               .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
               .flatMap(a -> Mono.just(a).subscribeOn(Schedulers.elastic()))
               .subscribe();
          System.out.println("Here");
          Thread.sleep(Integer.MAX_VALUE);
      }
    

在这里,所有元素都打印出来maindoOnNext并且here仅在流完成时才打印(释放主线程)。

我错过了什么吗?

标签: reactive-programmingproject-reactor

解决方案


您需要了解整个反应器是基于反应流规范构建的。因此,上面的每个运算符都是发布者和订阅者的组合。

在场景 2

   public static void main(String[] args) throws InterruptedException {
       Flux.range(1, 100_000)
           .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
           .flatMap(a -> Mono.just(a).subscribeOn(Schedulers.elastic()))
           .subscribe();
      System.out.println("Here");
      Thread.sleep(Integer.MAX_VALUE);
  }

发布者从主线程 (Flux.Range) 开始,订阅者 (doOnNext) 正在订阅(在主线程上调用 onNext 调用),它被委托给一个弹性线程,该线程的消耗速度比订阅者发布的速度要快。所以整个 Flux.Range 在主线程上被调用并被分发到弹性线程池来处理

在场景 1

  public static void main(String[] args) throws InterruptedException {
    Flux.range(1, 100_000)
        .doOnNext(a -> System.out.println(a + ", Publisher: " + Thread.currentThread().getName()))
        .flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic())).doOnNext(a -> System.out.println(a + ", Subscriber: " + Thread.currentThread().getName()))
        .subscribe();
    System.out.println("Here");
    Thread.sleep(Integer.MAX_VALUE);
  }

  private static int blockingMethod(int s) {
    try {
      Thread.sleep(100_000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return s;
  }

Flux.Range 的前 256 次调用发生在主线程上。请记住,响应式流是背压驱动的编程,并且由于平面地图只能合并 256 个上游,平面地图运算符(它是订阅者)不会在 256 个事件后调用 onNext 调用,因为它们都被阻塞/等待完全的。

主线程现在被释放了,因为它完成了参与反应管道并走出传送带的责任。即基本上继续执行下一行代码。这就是为什么您会在前 256 个元素之后看到“Here”的原因。然后主线程继续按照代码无限期地阻塞。

一旦其中一个阻塞线程完成工作,它们将触发订阅者在完成较早请求的线程(即弹性线程)上调用 onNext。弹性线程现在会将调用委托给内部反应器管道中的另一个弹性线程。

这意味着对于我发出的同一个项目,打印“Publisher”的线程和打印“Subscriber”的线程是不一样的,即使它们是弹性线程。随意验证以下代码


推荐阅读