首页 > 解决方案 > 猫效应:非阻塞 IO 使用哪个线程池?

问题描述

从本教程https://github.com/sluc/concurrency-in-scala-with-ce#threading 异步操作分为 3 组,并且需要显着不同的线程池来运行:

非阻塞异步操作:

线程数非常少(甚至可能只有一个)的有界池,具有非常高的优先级。这些线程基本上大部分时间都处于空闲状态并不断轮询是否有新的异步 IO 通知。这些线程花费在处理请求上的时间直接映射到应用程序延迟,因此除了接收通知并将它们转发到应用程序的其余部分之外,在这个池中没有其他工作完成是非常重要的。线程数非常少(甚至可能只有一个)的有界池,具有非常高的优先级。这些线程基本上大部分时间都处于空闲状态并不断轮询是否有新的异步 IO 通知。这些线程处理请求所花费的时间直接映射到应用程序延迟,所以它'

阻塞异步操作:

无界缓存池。无界,因为阻塞操作可以(并且将会)阻塞线程一段时间,同时我们希望能够服务其他 I/O 请求。缓存是因为我们可能会因为创建太多线程而耗尽内存,所以重用现有线程很重要。

CPU密集型操作:

固定池,其中线程数等于 CPU 内核数。这很简单。过去的“黄金法则”是线程数 = CPU 内核数 + 1,但“+1”是因为总是为 I/O 保留一个额外的线程(如上所述,我们现在有单独的池)。

在我的 Cats Effect 应用程序中,我使用基于 Scala Future 的 ReactiveMongo 库来访问 MongoDB,它在与 MongoDB 对话时不会阻塞线程,例如执行非阻塞 IO。

它需要执行上下文。Cats 效果提供默认执行上下文IOApp.executionContext

我的问题是:我应该为非阻塞 io 使用哪个执行上下文?

IOApp.executionContext?

但是,从IOApp.executionContext文档中:

为应用程序提供默认的 ExecutionContext。

JVM 之上的默认设置是基于可用 CPU 的可用数量延迟构建为固定线程池(请参阅 PoolUtils)。

似乎这个执行上下文属于我上面列出的第三组 - CPU-heavy operations (Fixed pool in which number of threads equals the number of CPU cores.),这让我认为 IOApp.executionContext 不是非阻塞 IO 的好候选者。

我是对的吗,我应该为非阻塞 IO 创建一个带有固定线程池(1 或 2 个线程)的单独上下文(因此它将属于我上面列出的第一组 - Non-blocking asynchronous operations: Bounded pool with a very low number of threads (maybe even just one), with a very high priority.)?

或者是IOApp.executionContext为 CPU-bound 和 Non-Blocking IO 操作而设计的?

我用来将 Scala Future 转换为 F 并排除执行上下文的函数:

def scalaFutureToF[F[_]: Async, A](
      future: => Future[A]
  )(implicit ec: ExecutionContext): F[A] =
    Async[F].async { cb =>
      future.onComplete {
        case Success(value)     => cb(Right(value))
        case Failure(exception) => cb(Left(exception))
      }
    }

标签: scalafuturecats-effect

解决方案


在 Cats Effect 3 中,每个IOApp都有Runtime

final class IORuntime private[effect] (
  val compute: ExecutionContext,
  private[effect] val blocking: ExecutionContext,
  val scheduler: Scheduler,
  val shutdown: () => Unit,
  val config: IORuntimeConfig,
  private[effect] val fiberErrorCbs: FiberErrorHashtable = new FiberErrorHashtable(16)
)

您几乎总是希望保留默认值,而不是随意声明自己的运行时,除非在测试或教育示例中。

在您的内部,您IOApp可以compute通过以下方式访问游泳池:

runtime.compute

如果要执行阻塞操作,则可以使用以下blocking构造:

blocking(IO(println("foo!"))) >> IO.unit

这样,您就告诉 CE3 运行时此操作可能会阻塞,因此应该分派到专用池。见这里

CE2呢?嗯,它有类似的机制,但它们非常笨重,而且还包含很多惊喜。例如,阻塞调用被安排使用Blocker,然后必须以某种方式凭空召唤或线程通过整个应用程序,线程池定义是使用笨拙的ContextShift. 如果您对此有任何选择,我强烈建议您投入一些精力来迁移到 CE3

很好,但是 Reactive Mongo 呢?

ReactiveMongo 使用 Netty(基于 Java NIO API)。而且Netty有自己的线程池。这在 Netty 5 中有所改变(参见此处),但 ReactiveMongo 似乎仍在 Netty 4 上(参见此处)。

但是,ExecutionContext您要询问的是将执行回调的线程池。这可以是您的计算池。

让我们看一些代码。首先,你的翻译方法。我刚改成因为我用的是CE3,我加了线程printline asyncasync_

def scalaFutureToF[F[_]: Async, A](future: => Future[A])(implicit ec: ExecutionContext): F[A] =
  Async[F].async_ { cb =>
    future.onComplete {
      case Success(value)     => {
        println(s"Inside Callback: [${Thread.currentThread.getName}]")
        cb(Right(value))
      }
      case Failure(exception) => cb(Left(exception))
    }
  }

现在让我们假设我们有两个执行上下文——一个来自我们的IOApp,另一个代表 ReactiveMongo 用来运行Future. 这是编造的 ReactiveMongo 之一:

val reactiveMongoContext: ExecutionContext =
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

另一个是简单的runtime.compute

现在让我们Future这样定义:

def myFuture: Future[Unit] = Future {
  println(s"Inside Future: [${Thread.currentThread.getName}]")
}(reactiveMongoContext)

请注意我们是如何Future通过将 传递给它来假装它在 ReactiveMongo 内部运行的reactiveMongoContext

最后,让我们运行应用程序:

override def run: IO[Unit] = {
  val myContext: ExecutionContext = runtime.compute
  scalaFutureToF(myFuture)(implicitly[Async[IO]], myContext)
}

这是输出:

未来内部:[pool-1-thread-1]
内部回调:[io-compute-6]

我们提供的执行上下文scalaFutureToF只是运行回调。Future 本身运行在我们单独的线程池上,它代表 ReactiveMongo 的池。实际上,您将无法控制这个池,因为它来自 ReactiveMongo。

额外信息

顺便说一句,如果您不使用类型类层次结构 ( F),而是IO直接使用值,那么您可以使用这个简化的方法:

def scalaFutureToIo[A](future: => Future[A]): IO[A] =
  IO.fromFuture(IO(future))

看看这个甚至不需要你通过ExecutionContext- 它只是使用你的计算池。或者更具体地说,它使用为 定义的任何内容def executionContext: F[ExecutionContext]Async[IO]结果是计算池。让我们检查:

override def run: IO[Unit] = {
  IO.executionContext.map(ec => println(ec == runtime.compute))
}
// prints true

最后但并非最不重要的:

如果我们真的有办法指定 ReactiveMongo 的底层 Netty 应该使用哪个线程池,那么是的,在这种情况下我们绝对应该使用一个单独的池。我们永远不应该将我们的runtime.compute池提供给其他运行时。


推荐阅读