scala - Akka - 如何阻止整个循环而不是收到的每条消息
问题描述
我正在尝试编写一个演员层次结构,它可以并行查找 0 到 5 之间的数字。
这是我的代码:
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
class A1 extends Actor {
override def receive = {
case "test" => {
val r = new scala.util.Random
val r1 = 0 + r.nextInt(( 5 - 0) + 1)
if(r1 == 1) {
sender() ! "found"
}
else {
sender() ! "NotFound"
}
}
}
}
object BTest extends App{
val actorSystem = ActorSystem("firstActorSystem")
val a1 = actorSystem.actorOf(Props[A1], "A1")
implicit val timeout = Timeout(5 seconds)
var answer = ""
while(answer != "found") {
answer = Await.result(a1 ? "test", timeout.duration).toString
Thread.sleep(1000)
println("answer : " + answer)
}
}
根据1
找到数字的时间,它会打印以下各种内容:
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : found
我不确定如何在阻塞的同时同时执行演员,直到1
找到数字。我认为我的解决方案会阻塞,直到使用Await.result
.
如何阻止整个循环而不是收到的每条消息并在1
找到号码时解除阻止?
解决方案
这个问题的答案归结为如何组合多个Future
s 直到其中一个返回您感兴趣的值。您可能希望在任何给定时间对同时运行的期货数量设置一些上限。
您可以像这样使用 Akka Streams 来实现它:
implicit val materializer: Materializer = Materializer(actorSystem)
val maxConcurrency = 16
val future =
Source.repeat(1)
.mapAsync(maxConcurrency)(_ => a1 ? "test")
.filter("found" == _)
.runWith(Sink.head)
Await.result(future , timeout.duration).toString
推荐阅读
- android - 导航架构组件导航,只有一个上下文
- python - Google Colab 在包含大量文件的云端硬盘文件夹中遇到问题
- c++ - malloc: *** 对象 0x100519860 的错误:未分配被释放的指针 *** 在 malloc_error_break 中设置断点以进行调试
- java - Kotlin:扩展动态给定的超类型
- android - TextView:获取最后一行末尾的 x 和 y
- python - 将 JSON 数据加载到 pandas 数据框中并创建自定义列
- python - 根据行标签合并行 - pandas python
- python - 如何有效地索引 Groupby 对象?
- yocto - 如何为 yocto systemd 服务添加配方
- libcurl - libcurl:如何从活动连接中获取文件描述符?