首页 > 解决方案 > 反应堆按需通量或水槽

问题描述

考虑一个 HTTP 控制器端点,它接受请求、验证然后返回 ack,但同时在后台做一些“繁重的工作”。

Reactor(我感兴趣)有两种方法可以实现:

第一种方法

@PostMapping(..)
fun acceptRequest(request: Request): Response {
  if(isValid(request)) {
    Mono.just(request)
      .flatMap(service::doHeavyWork)
      .subscribe(...)
    return Response(202)
  } else {
    return Response(400)
  }
}

第二种方法

class Controller {
  private val service = ...
  private val sink = Sinks.many().unicast().onBackpressureBuffer<Request>()
  private val stream = sink.asFlux().flatMap(service::doHeavyWork).subscribe(..)


  fun acceptRequest(request: Request): Response {
    if(isValid(request)) {
      sink.tryEmitNext(request) // for simplicity not handling errors
      return Response(202)
    } else {
      return Response(400)
    }
  }
}

哪种方法更好/更差,为什么?

我要问的原因是,在 Akka 中,按需构建流并不是很有效,因为流每次都需要实现,所以最好采用“接收器方法”。我想知道这是否也适用于反应堆,或者使用这些方法是否还有其他优点/缺点。

标签: spring-bootreactive-programmingproject-reactor

解决方案


我对 Akka 不太熟悉,但是使用 Reactor 构建反应链绝对不会引起巨大的开销——这是处理请求的“正常”方式。因此,我认为不需要像第二种方法那样使用单独的接收器 - 这似乎只是增加了复杂性而收效甚微。因此,我会说第一种方法更好。

话虽如此,通常不建议像在这两个示例中那样订阅自己 - 但这种“一劳永逸”的工作是可能有意义的少数情况之一。我在这里提出的其他一些潜在警告可能值得考虑:

  • 你称这项工作“繁重”,我不确定这是否意味着它的 CPU 很重,或者只是 IO 很重,或者需要很长时间。如果只是因为触发了一堆请求而需要很长时间,那没什么大不了的。但是,如果它的 CPU 很重,那么如果您不小心,这可能会导致问题 - 您可能不想在事件循环线程上执行 CPU 繁重的任务。在这种情况下,我可能会创建一个由您自己的执行程序服务支持的单独调度程序,然后用于subscribeOn()卸载那些 CPU 繁重的任务。
  • 请记住,在这种情况下,“一劳永逸”的模式实际上是“忘记”——您完全无法知道您卸载的繁重任务是否有效,因为您基本上通过自我订阅将这些信息扔掉了. 根据您的用例,这可能没问题,但如果任务很关键,或者如果它失败了您需要某种反馈,那么值得考虑的是这可能不是最好的方法。

推荐阅读