scala - scala ZIO foreachPar
问题描述
我是并行编程和 ZIO 的新手,我正在尝试通过并行请求从 API 获取数据。
import sttp.client._
import zio.{Task, ZIO}
ZIO.foreach(files) { file =>
getData(file)
Task(file.getName)
}
def getData(file: File) = {
val data: String = readData(file)
val request = basicRequest.body(data).post(uri"$url")
.headers(content -> "text", char -> "utf-8")
.response(asString)
implicit val backend: SttpBackend[Identity, Nothing, NothingT] = HttpURLConnectionBackend()
request.send().body
resquest.Response match {
case Success(value) => {
val src = new PrintWriter(new File(filename))
src.write(value.toString)
src.close()
}
case Failure(exception) => log error
}
当我按顺序执行程序时,如果我尝试并行运行,它会按预期工作,方法是更改ZIO.foreach
为ZIO.foreachPar
. 该程序过早终止,我明白了,我在这里缺少一些基本的东西,任何帮助都可以帮助我解决问题。
解决方案
一般来说,我不建议将同步阻塞代码与异步非阻塞代码混合,这是 ZIO 的主要角色。可以这么说,关于如何有效地将 ZIO 与“世界”一起使用,有一些很棒的演讲。
我要提出两个关键点,一是 ZIO 让您可以通过附加分配和完成步骤来有效地管理资源,二是“效果”我们可以说是“实际与世界交互的事物”应该尽可能地包含在最紧凑的范围内* .
所以让我们稍微看一下这个例子,首先,我不建议使用默认Identity
的后端后端ZIO
,我建议使用AsyncHttpClientZioBackend
。
import sttp.client._
import zio.{Task, ZIO}
import zio.blocking.effectBlocking
import sttp.client.asynchttpclient.zio.AsyncHttpClientZioBackend
// Extract the common elements of the request
val baseRequest = basicRequest.post(uri"$url")
.headers(content -> "text", char -> "utf-8")
.response(asString)
// Produces a writer which is wrapped in a `Managed` allowing it to be properly
// closed after being used
def managedWriter(filename: String): Managed[IOException, PrintWriter] =
ZManaged.fromAutoCloseable(UIO(new PrintWriter(new File(filename))))
// This returns an effect which produces an `SttpBackend`, thus we flatMap over it
// to extract the backend.
val program = AsyncHttpClientZioBackend().flatMap { implicit backend =>
ZIO.foreachPar(files) { file =>
for {
// Wrap the synchronous reading of data in a `Task`, but which allows runs this effect on a "blocking" threadpool instead of blocking the main one.
data <- effectBlocking(readData(file))
// `send` will return a `Task` because it is using the implicit backend in scope
resp <- baseRequest.body(data).send()
// Build the managed writer, then "use" it to produce an effect, at the end of `use` it will automatically close the writer.
_ <- managedWriter("").use(w => Task(w.write(resp.body.toString)))
} yield ()
}
}
此时,您将只program
需要使用其中一种unsafe
方法运行,或者如果您使用的是zio.App
通过main
方法。
*
并非总是可行或方便,但它很有用,因为它通过将任务返回给运行时进行调度来防止资源占用。
推荐阅读
- javascript - 在 ionRangeSlider 中添加 onChange 事件延迟
- java - 如何在我的 jsp 页面上使用计数查询显示函数的结果?
- c - 运行 for 循环时的奇怪输出
- flutter - Flutter SizeTransition 和 PageRouteBuilder 不起作用
- angularjs - 'protractor' 不是内部或外部命令、可运行程序或批处理文件
- ios - 有权在 Swift 中读取视频文件但不能读取图像文件?
- python - Python/scrapy 嵌套的 for/if 循环工作不正确
- python - 如何导入 CSV 以分隔变量以便为 Matplotlib 条形图“计数”
- python - wxpython:同时水平居中和右对齐两个控件
- python - Python 3 中的 range() 和 xrange()