首页 > 解决方案 > 带有演员的简单 Akka 整数列表加法器

问题描述

我正在努力解决这个练习,但我找不到让它工作的方法,即使在从 Akka 网站(https://doc.akka.io/docs/akka/current/typed/actors.html )复制示例之后)。

我需要基本上只是将整数列表拆分为 n 个分区,将每个分区分配给每个 n 个参与者,让参与者对它们的所有值求和,最后向父参与者返回一条带有结果的消息,这会将其添加到累加器变量中. 我不需要做任何花哨的事情,比如日志记录、容错、性能优化等等。

import AdderMain.StartJob
import AdderWorker.ProcessDone
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, LoggerOps}
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}

object AdderWorker {
    final case class Process(replyTo: ActorRef[ProcessDone], payloadPartition: List[Int])
    final case class ProcessDone(result: Int)

    def apply(): Behaviors.Receive[Process] =
        Behaviors.receiveMessage[Process] {
            case Process(replyTo, payloadPartition) =>
                println(s"Worker is processing data...")
                val result = sumListElements(payloadPartition)
                replyTo ! ProcessDone(result)
                Behaviors.same
        }

    private def sumListElements(list: List[Int]): Int = list match {
        case Nil => 0
        case x :: xs => x + sumListElements(xs)
    }
}

object AdderMain extends App {

    final case class StartJob(payload: List[Int])

    def apply(payload: List[Int]): Behavior[ProcessDone] =
        Behaviors.setup { context =>
            val payloadPartitions: List[List[Int]] = payload.grouped(4).toList
            payloadPartitions.foreach { l =>
                val worker = context.spawn(AdderWorker(), "Stuff")
                worker ! AdderWorker.Process(context.self, l)
            }

            Behaviors.receiveMessage { message =>
                println(s"Done ${message}")
                Behaviors.same
            }
        }

    val payload = List(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)

    val system: ActorSystem[AdderMain.StartJob] = ActorSystem(AdderMain(payload), "AdderMain")
}

如您所见,为每个列表分区AdderMain生成一个AdderWorker并发送包含列表的“进程”消息。我得到的错误是

Error:(45, 72) type mismatch;
 found   : akka.actor.typed.Behavior[AdderWorker.ProcessDone]
 required: akka.actor.typed.Behavior[Product with java.io.Serializable]
Note: AdderWorker.ProcessDone <: Product with java.io.Serializable, but class Behavior is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
    val system: ActorSystem[AdderMain.StartJob] = ActorSystem(AdderMain(payload), "AdderMain")

但它似乎源于我的代码中更大的问题......请随意纠正我我刚刚开始使用 Scala、Akka 和函数式编程。

PS:我还没有积累任何变量的结果

编辑:这是一个可行的解决方案

import java.util.UUID

import AdderWorker.ProcessDone
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, LoggerOps}


object AdderWorker {
    final case class Process(replyTo: ActorRef[ProcessDone], payloadPartition: List[Int])
    final case class ProcessDone(result: Int)

    def apply(): Behaviors.Receive[Process] =
        Behaviors.receiveMessage[Process] {
            case Process(replyTo, payloadPartition) =>
                println(s"Worker is processing data...")
                val result = sumListElements(payloadPartition)
                replyTo ! ProcessDone(result)
                Behaviors.stopped
        }

    private def sumListElements(list: List[Int]): Int = list match {
        case Nil => 0
        case x :: xs => x + sumListElements(xs)
    }
}

object BasicAdder extends App {
    var counter = 0
    var max = 0
    var sum = 0

    final case class StartJob(payload: List[Int])

    def apply(payload: List[Int]): Behavior[ProcessDone] =
        Behaviors.setup { context =>
            val payloadPartitions: List[List[Int]] = payload.grouped(4).toList
            max = payloadPartitions.size

            payloadPartitions.foreach { l =>
                val worker = context.spawn(AdderWorker(), UUID.randomUUID.toString)

                // Send the Process message to the newly spawned worker
                // providing a reference to the guardian actor and a partition of the payload
                worker ! AdderWorker.Process(context.self, l)
            }

            Behaviors.receiveMessage { message =>
                // this is thread-safe
                sum += message.result
                counter += 1

                if (counter < max) {
                    println(s"Current sum: ${sum}")
                    Behaviors.same
                } else {
                    println(s"Final sum: ${sum}")
                    Behaviors.stopped
                }
            }
        }

        val payload = List(10, 20, 30, 40, 50, 60, 70, 80, 90, 100) // result should be 550

    val system: ActorSystem[AdderWorker.ProcessDone] = ActorSystem(BasicAdder(payload), "AdderMain")
}

标签: scalaakka

解决方案


推荐阅读