首页 > 解决方案 > FS2 对具有不同鉴别器的项目进行并行评估

问题描述

在以下示例中,具有不同鉴别器( 和 )的项目"a""b"并行"c"评估(打印):

package org.example

import cats.effect.std.Random
import cats.effect.{ExitCode, IO, IOApp, Temporal}
import cats.syntax.all._
import cats.{Applicative, Monad}
import fs2._

import scala.concurrent.duration._

object GitterQuestion extends IOApp {

  override def run(args: List[String]): IO[ExitCode] =
    Random.scalaUtilRandom[IO].flatMap { implicit random =>
      val flat = Stream(
        ("a", 1),
        ("a", 2),
        ("a", 3),

        ("b", 1),
        ("b", 2),
        ("b", 3),

        ("c", 1),
        ("c", 2),
        ("c", 3)
      ).covary[IO]

      val a = flat.filter(_._1 === "a").through(rndDelay)
      val b = flat.filter(_._1 === "b").through(rndDelay)
      val c = flat.filter(_._1 === "c").through(rndDelay)

      val nested = Stream(a, b, c)

      nested.parJoin(100).printlns.compile.drain.as(ExitCode.Success)
    }

  def rndDelay[F[_]: Monad: Random: Temporal, A]: Pipe[F, A, A] =
    in =>
      in.evalMap { v =>
        (Random[F].nextDouble.map(_.seconds) >>= Temporal[F].sleep) >> Applicative[F].pure(v)
      }
}

运行此程序的结果将类似于以下内容:

(c,1)
(a,1)
(c,2)
(a,2)
(c,3)
(b,1)
(a,3)
(b,2)
(b,3)

请注意,具有相同鉴别器的项目之间没有重新排序 - 它们是按顺序处理的。(a, 2)以前永远不会打印(a, 1)

在我的现实世界场景中,鉴别器的值是不知道的,可能有很多,但我想有同样的行为,我该怎么做?

标签: scalascala-catscats-effectfs2

解决方案


我认为您需要为此推出自己的groupBy功能。我认为您必须Queue为每个鉴别器创建一个。然后对于每一个Queue发出一个Stream从那个中提取元素的内部Queue

这是我想到的未经测试且可能很幼稚的实现:

import cats.effect.std.Queue

val nested = 
  (flat.map(Some(_)) ++ Stream(None))
    .evalScan(Map.empty[String, Queue[IO, Option[(String, Int)]]] -> Option.empty[Stream[IO, (String, Int)]]){
      case ((map, _), t @ Some((key, value))) =>
        if (map.contains(key))
          map(key).offer(t).as(map -> None)
        else {
          for {
            q <- Queue.unbounded[IO, Option[(String, Int)]]
            _ <- q.offer(t)
            r = (map + (key -> q)) -> Some(Stream.fromQueueNoneTerminated(q))
          } yield r
        }
      case ((map, _), None) => 
      // None means the flat stream is finished
        map.values.toList.traverse(_.offer(None))
          .as(Map.empty -> None)
    }
    .map(_._2).unNone

val parallelism: Int = ???

nested
  .map(_.through(rndDelay))
  // produce and consume in parallel in order to 
  // avoid deadlocks in case of bounded parJoin
  .prefetchN(parallelism)
  .parJoin(parallelism)
  .printlns
  .compile
  .drain
  .as(ExitCode.Success)

推荐阅读