scala - 带有演员的简单 Akka 整数列表加法器
问题描述
我正在努力解决这个练习,但我找不到让它工作的方法,即使在从 Akka 网站(https://doc.akka.io/docs/akka/current/typed/actors.html )复制示例之后)。
我需要基本上只是将整数列表拆分为 n 个分区,将每个分区分配给每个 n 个参与者,让参与者对它们的所有值求和,最后向父参与者返回一条带有结果的消息,这会将其添加到累加器变量中. 我不需要做任何花哨的事情,比如日志记录、容错、性能优化等等。
import AdderMain.StartJob
import AdderWorker.ProcessDone
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, LoggerOps}
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
object AdderWorker {
final case class Process(replyTo: ActorRef[ProcessDone], payloadPartition: List[Int])
final case class ProcessDone(result: Int)
def apply(): Behaviors.Receive[Process] =
Behaviors.receiveMessage[Process] {
case Process(replyTo, payloadPartition) =>
println(s"Worker is processing data...")
val result = sumListElements(payloadPartition)
replyTo ! ProcessDone(result)
Behaviors.same
}
private def sumListElements(list: List[Int]): Int = list match {
case Nil => 0
case x :: xs => x + sumListElements(xs)
}
}
object AdderMain extends App {
final case class StartJob(payload: List[Int])
def apply(payload: List[Int]): Behavior[ProcessDone] =
Behaviors.setup { context =>
val payloadPartitions: List[List[Int]] = payload.grouped(4).toList
payloadPartitions.foreach { l =>
val worker = context.spawn(AdderWorker(), "Stuff")
worker ! AdderWorker.Process(context.self, l)
}
Behaviors.receiveMessage { message =>
println(s"Done ${message}")
Behaviors.same
}
}
val payload = List(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
val system: ActorSystem[AdderMain.StartJob] = ActorSystem(AdderMain(payload), "AdderMain")
}
如您所见,为每个列表分区AdderMain
生成一个AdderWorker
并发送包含列表的“进程”消息。我得到的错误是
Error:(45, 72) type mismatch;
found : akka.actor.typed.Behavior[AdderWorker.ProcessDone]
required: akka.actor.typed.Behavior[Product with java.io.Serializable]
Note: AdderWorker.ProcessDone <: Product with java.io.Serializable, but class Behavior is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
val system: ActorSystem[AdderMain.StartJob] = ActorSystem(AdderMain(payload), "AdderMain")
但它似乎源于我的代码中更大的问题......请随意纠正我我刚刚开始使用 Scala、Akka 和函数式编程。
PS:我还没有积累任何变量的结果
编辑:这是一个可行的解决方案
import java.util.UUID
import AdderWorker.ProcessDone
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, LoggerOps}
object AdderWorker {
final case class Process(replyTo: ActorRef[ProcessDone], payloadPartition: List[Int])
final case class ProcessDone(result: Int)
def apply(): Behaviors.Receive[Process] =
Behaviors.receiveMessage[Process] {
case Process(replyTo, payloadPartition) =>
println(s"Worker is processing data...")
val result = sumListElements(payloadPartition)
replyTo ! ProcessDone(result)
Behaviors.stopped
}
private def sumListElements(list: List[Int]): Int = list match {
case Nil => 0
case x :: xs => x + sumListElements(xs)
}
}
object BasicAdder extends App {
var counter = 0
var max = 0
var sum = 0
final case class StartJob(payload: List[Int])
def apply(payload: List[Int]): Behavior[ProcessDone] =
Behaviors.setup { context =>
val payloadPartitions: List[List[Int]] = payload.grouped(4).toList
max = payloadPartitions.size
payloadPartitions.foreach { l =>
val worker = context.spawn(AdderWorker(), UUID.randomUUID.toString)
// Send the Process message to the newly spawned worker
// providing a reference to the guardian actor and a partition of the payload
worker ! AdderWorker.Process(context.self, l)
}
Behaviors.receiveMessage { message =>
// this is thread-safe
sum += message.result
counter += 1
if (counter < max) {
println(s"Current sum: ${sum}")
Behaviors.same
} else {
println(s"Final sum: ${sum}")
Behaviors.stopped
}
}
}
val payload = List(10, 20, 30, 40, 50, 60, 70, 80, 90, 100) // result should be 550
val system: ActorSystem[AdderWorker.ProcessDone] = ActorSystem(BasicAdder(payload), "AdderMain")
}
解决方案
推荐阅读
- python - 如何对 WSGI 应用程序进行重定向?
- postgresql - 如果没有任何变化,postgresql如何停止触发添加行
- r - R devtools::install_github / install_bitbucket - 不构建时未安装小插图
- c - 函数需要异常长的时间才能返回
- java - LOGCAT 显示错误,即使在成功构建后我的应用程序也无法打开
- git - Git:如何更新标签描述并推送到 GitHub?
- android - 为什么我的 React Native Android 应用程序在关闭或按下后退按钮时会崩溃?
- python-3.x - NameError:名称“paramiko”未定义
- ios - 多列 UITableView(单元格未对齐)
- xml - 对 XML Schema 进行排序,而不对命名元素的子项重新排序