首页 > 解决方案 > 从 Akka Actors 列表中异步收集和组合响应

问题描述

我有一个名为 Akka Actor Gate,它使用orStatus的响应消息来回答消息:OpenClosed

"A stateless gate" must {
    "be open" in {
      val parent = TestProbe()
      val gate = parent.childActorOf(
        TestStatelessGate.props(7)
      )
      gate ! 7
      gate ! Gate.Status
      parent.expectMsg(Gate.Open)
    }

我想做的是构造一个逻辑与门,它查询门列表,Open如果它们都打开则返回:

"A logical AND gate" must {
    "be open when all children are open" in {
      val parent = TestProbe()
      val parent2 = TestProbe()
      val gate_1 = parent.childActorOf(
        TestStatelessGate.props(7)
      )
      val gate_2 = parent.childActorOf(
        TestStatelessGate.props(5)
      )
      val gate_list = List(gate_1, gate_2)
      val and_gate = parent2.childActorOf(
        LogicalAndGate.props(gate_list)
      )
      gate_1 ! 7
      gate_2 ! 5
      and_gate ! Gate.Status
      parent2.expectMsg(Gate.Open)

forScala 文档对使用表达式有很好的说明,pipe 这里. 该文档的相关部分是:

final case class Result(x: Int, s: String, d: Double)
case object Request

implicit val timeout = Timeout(5 seconds) // needed for `?` below

val f: Future[Result] =
  for {
    x <- ask(actorA, Request).mapTo[Int] // call pattern directly
    s <- actorB.ask(Request).mapTo[String] // call by implicit conversion
    d <- (actorC ? Request).mapTo[Double] // call by symbolic name
  } yield Result(x, s, d)

f.pipeTo(actorD

我正在尝试使用 ActorRefs 列表(gate_list在下面的代码中)做这样的事情:

override def receive: Receive = {
    case Status => {
      val futures: Seq[Future[Any]] =
        for (g <- gate_list)
          yield ask(g, Status)
      val all_open: Future[Boolean] = Future {
        !futures.contains(Closed)
        }
      pipe(all_open) to parent
    }
  }

当然,这是行不通的,因为futures.contains(Closed)要比较两种不同类型的事物,aFuture[Any]和我的案例对象。

标签: scalaakka

解决方案


我假设OpenandClosedcase object继承自一些共同特征的值OpenClosed

首先,您需要使用mapToask结果转换为OpenClosed. 我也会使用map而不是for

val futures: Seq[Future[OpenClosed]] =
  gate_list.map(g => ask(g, Status).mapTo[OpenClosed])

然后你需要Future.sequence等待所有这些完成:

Future.sequence(futures).onComplete {
  case Success(res) =>
    parent ! res.forall(_ == Open)
  case Failure(_) =>
    parent ! Closed
}

推荐阅读