scala - FS2 对具有不同鉴别器的项目进行并行评估
问题描述
在以下示例中,具有不同鉴别器( 和 )的项目"a"
被"b"
并行"c"
评估(打印):
package org.example
import cats.effect.std.Random
import cats.effect.{ExitCode, IO, IOApp, Temporal}
import cats.syntax.all._
import cats.{Applicative, Monad}
import fs2._
import scala.concurrent.duration._
object GitterQuestion extends IOApp {
override def run(args: List[String]): IO[ExitCode] =
Random.scalaUtilRandom[IO].flatMap { implicit random =>
val flat = Stream(
("a", 1),
("a", 2),
("a", 3),
("b", 1),
("b", 2),
("b", 3),
("c", 1),
("c", 2),
("c", 3)
).covary[IO]
val a = flat.filter(_._1 === "a").through(rndDelay)
val b = flat.filter(_._1 === "b").through(rndDelay)
val c = flat.filter(_._1 === "c").through(rndDelay)
val nested = Stream(a, b, c)
nested.parJoin(100).printlns.compile.drain.as(ExitCode.Success)
}
def rndDelay[F[_]: Monad: Random: Temporal, A]: Pipe[F, A, A] =
in =>
in.evalMap { v =>
(Random[F].nextDouble.map(_.seconds) >>= Temporal[F].sleep) >> Applicative[F].pure(v)
}
}
运行此程序的结果将类似于以下内容:
(c,1)
(a,1)
(c,2)
(a,2)
(c,3)
(b,1)
(a,3)
(b,2)
(b,3)
请注意,具有相同鉴别器的项目之间没有重新排序 - 它们是按顺序处理的。(a, 2)
以前永远不会打印(a, 1)
。
在我的现实世界场景中,鉴别器的值是不知道的,可能有很多,但我想有同样的行为,我该怎么做?
解决方案
我认为您需要为此推出自己的groupBy
功能。我认为您必须Queue
为每个鉴别器创建一个。然后对于每一个Queue
发出一个Stream
从那个中提取元素的内部Queue
。
这是我想到的未经测试且可能很幼稚的实现:
import cats.effect.std.Queue
val nested =
(flat.map(Some(_)) ++ Stream(None))
.evalScan(Map.empty[String, Queue[IO, Option[(String, Int)]]] -> Option.empty[Stream[IO, (String, Int)]]){
case ((map, _), t @ Some((key, value))) =>
if (map.contains(key))
map(key).offer(t).as(map -> None)
else {
for {
q <- Queue.unbounded[IO, Option[(String, Int)]]
_ <- q.offer(t)
r = (map + (key -> q)) -> Some(Stream.fromQueueNoneTerminated(q))
} yield r
}
case ((map, _), None) =>
// None means the flat stream is finished
map.values.toList.traverse(_.offer(None))
.as(Map.empty -> None)
}
.map(_._2).unNone
val parallelism: Int = ???
nested
.map(_.through(rndDelay))
// produce and consume in parallel in order to
// avoid deadlocks in case of bounded parJoin
.prefetchN(parallelism)
.parJoin(parallelism)
.printlns
.compile
.drain
.as(ExitCode.Success)
推荐阅读
- vuex - 如何在 vuex 状态上强制执行不变量
- php - php如何从li中加载一个带有所选项目的div
- php - 当存在观察者时,如何创建将更新某些行的迁移?
- android - 为什么 ONVIF 订阅方法不起作用?
- android - 使用edittext和自定义适配器过滤Json Listview
- javascript - oracle 返回空对象
- azure - 受 Azure AD 保护的 AspCore 应用程序陷入无限身份验证重定向循环
- .net - Jwt 令牌不提供当前用户详细信息
- java - (Re) 命名 Spring Boot HealthIndicator
- c# - 第二个 For 循环值取决于第一个 For 循环值