首页 > 解决方案 > 猫效应和异步 IO 细节

问题描述

几天来,我一直在关注猫效应和 IO。而且我觉得我对这种效果有一些误解,或者只是我错过了它的意义。

  1. 首先——如果 IO 可以替代 Scala 的 Future,我们如何创建异步 IO 任务?使用IO.shift? 使用IO.async? 是IO.delay同步还是异步?我们可以用这样的代码做一个通用的异步任务Async[F].delay(...)吗?或者当我们用unsafeToAsyncor调用 IO 时会发生异步unsafeToFuture
  2. 猫效应中的异步和并发有什么意义?他们为什么分开?
  3. IO 是绿色线程吗?如果是,为什么猫效应中有一个 Fiber 对象?据我了解,Fiber 是绿色线程,但文档声称我们可以将 IO 视为绿色线程。

我将不胜感激任何澄清,因为我未能理解那些和互联网上的猫效应文档并没有那么有帮助......

标签: scalascala-catscats-effect

解决方案


如果 IO 可以替代 Scala 的 Future,我们如何创建异步 IO 任务

首先,我们需要澄清什么是异步任务。通常async的意思是“不阻塞操作系统线程”,但是既然你提到Future了,那就有点模糊了。说,如果我写:

Future { (1 to 1000000).foreach(println) }

它不会是async,因为它是一个阻塞循环和阻塞输出,但它可能会在不同的操作系统线程上执行,由隐式 ExecutionContext 管理。等效的猫效应代码将是:

for {
  _ <- IO.shift
  _ <- IO.delay { (1 to 1000000).foreach(println) }
} yield ()

(这不是较短的版本)

所以,

  • IO.shift用于可能更改线程/线程池。Future每次操作都会这样做,但在性能方面它不是免费的。
  • IO.delay{ ... } (aka IO { ... }) 不会进行任何异步操作,也不会切换线程。它用于IO从同步的副作用 API 创建简单的值

现在,让我们回到真正的 async。这里要理解的是:

     每个异步计算都可以表示为一个接受回调的函数。

无论您使用的是返回的 APIFuture还是 Java 的CompletableFuture,还是类似 NIO 的东西CompletionHandler,都可以转换为回调。这就是IO.async目的:您可以将任何接受回调的函数转换为IO. 如果是这样:

for {
  _ <- IO.async { ... }
  _ <- IO(println("Done"))
} yield ()

Done仅当(以及如果)回调中的计算时才会打印...。您可以将其视为阻塞绿色线程,而不是 OS 线程。

所以,

  • IO.async用于将任何已经异步的计算转换为IO.
  • IO.delay用于将任何完全同步的计算转换为IO.
  • 具有真正异步计算的代码的行为就像它阻塞了一个绿色线程。

使用 s 时最接近的类比Future是创建 ascala.concurrent.Promise和返回p.future


或者当我们使用 unsafeToAsync 或 unsafeToFuture 调用 IO 时会发生异步?

有点。使用IO,除非您调用其中之一(或使用) ,否则不会发生任何事情。IOApp但是 IO 不保证您将在不同的操作系统线程上执行,甚至异步执行,除非您使用IO.shiftor明确要求IO.async

您可以随时使用 eg 保证线程切换(IO.shift *> myIO).unsafeRunAsyncAndForget()。这正是可能的,因为myIO在被要求之前不会执行,无论您将它作为val myIO还是def myIO.

但是,您不能神奇地将阻塞操作转换为非阻塞操作。Future使用或使用都不可能IO


猫效应中的异步和并发有什么意义?他们为什么分开?

AsyncConcurrent(and Sync) 是类型类。它们的设计使程序员可以避免被锁定cats.effect.IO并可以为您提供支持您选择的任何内容的 API,例如 monix Task 或 Scalaz 8 ZIO,甚至是OptionT[Task, *something*]. fs2、monix 和 http4s 等库利用它们为您提供更多使用它们的选择。

Concurrent在 之上添加额外的东西Async,其中最重要的是.cancelableand .start。这些与 没有直接的类比Future,因为它根本不支持取消。

.cancelable是一个版本,.async它允许您还指定一些逻辑来取消您正在包装的操作。一个常见的例子是网络请求——如果你不再对结果感兴趣,你可以在不等待服务器响应的情况下中止它们,并且不要浪费任何套接字或处理时间来读取响应。你可能永远不会直接使用它,但它有它的位置。

但是如果你不能取消它们,那么可取消的操作有什么用呢?这里的主要观察是您不能从其内部取消操作。其他人必须做出这个决定,这将与操作本身同时发生(这是类型类的名称)。这就是.start进来的地方。简而言之,

      .start是一个绿色线程的显式分支。

做事someIO.start类似于做事val t = new Thread(someRunnable); t.start(),只是现在是绿色的。并且本质上是APIFiber的精简版本:你可以做,就像,但它不会阻塞 OS 线程;和,这是 的安全版本。Thread.joinThread#join().cancel.interrupt()


请注意,还有其他方法可以分叉绿色线程。例如,做并行操作:

val ids: List[Int] = List.range(1, 1000)
def processId(id: Int): IO[Unit] = ???
val processAll: IO[Unit] = ids.parTraverse_(processId)

会将所有 ID 分叉到绿色线程,然后将它们全部加入。或使用.race

val fetchFromS3: IO[String] = ???
val fetchFromOtherNode: IO[String] = ???

val fetchWhateverIsFaster = IO.race(fetchFromS3, fetchFromOtherNode).map(_.merge)

将并行执行提取,给你第一个结果完成并自动取消较慢的提取。所以,做.start和使用Fiber并不是派生更多绿色线程的唯一方法,只是最明确的一种。这就是答案:

IO 是绿色线程吗?如果是,为什么猫效应中有一个 Fiber 对象?据我了解,Fiber 是绿色线程,但文档声称我们可以将 IO 视为绿色线程。

  • IO就像一个绿色线程,这意味着您可以让许多线程并行运行,而不会产生 OS 线程的开销,并且用于理解的代码的行为就像是阻塞了要计算的结果一样。

  • Fiber是一个控制绿色线程显式分叉(等待完成或取消)的工具。


推荐阅读