首页 > 解决方案 > Akka - 如何阻止整个循环而不是收到的每条消息

问题描述

我正在尝试编写一个演员层次结构,它可以并行查找 0 到 5 之间的数字。

这是我的代码:

import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

class A1 extends Actor {
  override def receive = {
    case "test" => {
      val r = new scala.util.Random
      val r1 = 0 + r.nextInt(( 5 - 0) + 1)
      if(r1 == 1) {
        sender() ! "found"
      }
      else {
        sender() ! "NotFound"
      }

    }
  }
}


object BTest extends App{

  val actorSystem = ActorSystem("firstActorSystem")
  val a1 = actorSystem.actorOf(Props[A1], "A1")

  implicit val timeout = Timeout(5 seconds)

  var answer = ""
  while(answer != "found") {
    answer = Await.result(a1 ? "test", timeout.duration).toString
    Thread.sleep(1000)
    println("answer : " + answer)
  }
}

根据1找到数字的时间,它会打印以下各种内容:

answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : found

我不确定如何在阻塞的同时同时执行演员,直到1找到数字。我认为我的解决方案会阻塞,直到使用Await.result.

如何阻止整个循环而不是收到的每条消息并在1找到号码时解除阻止?

标签: scalaakka

解决方案


这个问题的答案归结为如何组合多个Futures 直到其中一个返回您感兴趣的值。您可能希望在任何给定时间对同时运行的期货数量设置一些上限。

您可以像这样使用 Akka Streams 来实现它:

implicit val materializer: Materializer = Materializer(actorSystem)

val maxConcurrency = 16

val future = 
  Source.repeat(1)
        .mapAsync(maxConcurrency)(_ => a1 ? "test")
        .filter("found" == _)
        .runWith(Sink.head)

Await.result(future , timeout.duration).toString

推荐阅读