首页 > 解决方案 > 在启动主逻辑之前处理 Actor 需求

问题描述

看看下面的图表:

图表

在这种情况下,我们有两个经理。ManagerA 和 ManagerB,都是由同一个父亲同时产生的。ManagerB 还立即生成 3 个子节点,它们开始处理一些数据。当工作人员完成处理并有结果时,他向 ManagerB 发送返回结果。

然后 ManagerB 通知 ManagerA 他已经准备好一个名为 B1 的对象。然后 ManagerA 告诉 A1、A2 和 A3 B1 已准备好并将其传递给他们。但只有 A1 和 A2 需要 B1,所以他们保留它,而 A3 丢弃该消息。A1 有他的所有要求,现在他开始执行他的主要逻辑,因为他有 B1,所以他这样做了。同时A2还需要B2,A3需要B3。

如何在 Scala 中使用 akka actor 来实现这样的逻辑?我很难找到一种方法来以功能性的方式保存需求并在最终满足所有需求时开始执行。

标签: scalaakkaactorakka-typed

解决方案


我将使用的基本思想是在案例类中对状态进行编码,其中至少具有一组要求和一个映射,其键是要求的(非严格)子集,其值是 B 工作人员的结果,例如对于A1,我们可能有:

requirements = Set("B1")
received = Map("B1" -> B1Result(...))

那么你的actor的行为就是它的状态的函数:改变状态的消息会改变行为。

更具体地说,您可能会执行以下操作:

object AWorker {
  type BResult = Any  // BResult would probably defined elsewhere, I'm aliasing Any for brevity

  type Action = (ActorContext[Command], Map[String, BResult]) => Unit

  sealed trait Command

  case class RequirementsAndDo(
    requirements: Set[String],
    action: Action,
    replyTo: Option[ActorRef[Reply]]
  ) extends Reply

  case class Completed(key: String, value: BResult, replyTo: Option[ActorRef[Reply]]) extends Reply

  sealed trait Reply
  case object Ack extends Reply

  def apply(): Behavior[Command] = withState(State())

  private[AWorker] case class State(
    requirements: Set[String] = Set.empty,
    received: Map[String, BResult] = Map.empty,
    whenRequirementsMet: Option[Action] = None
  ) {
    def updateRequirements(req: Set[String], f: Option[Action]): State = {
      // For now, start anew...
      State(req)
    }

    def completeRequirement(req: String, v: BResult): State =
      copy(received = received.updated(req, v))

    def checkAndPerform(ctx: ActorContext[Command]): State = {
      if (requirements.diff(received.keySet).isEmpty) {
        whenRequirementsMet.foreach { a =>
          a(ctx, received)
        }
        State()
      } else this
  }

  private def withState(state: State): Behavior[Command] =
    Behaviors.receive { (context, msg) =>
      msg match {
        case RequirementsAndDo(req, action, replyTo) =>
          val nextState = state.updateRequirements(req, action)
          replyTo.foreach(_ ! Ack)
          withState(nextState)
        case Completed(key, value, replyTo) =>
          val intermediateState = state.completeRequirement(key, value)
          replyTo.foreach(_ ! Ack)
          withState(intermediateState.checkAndPerform(context))
      }
    }
}

您可以通过让方法返回一组效果和新状态来使其更加 FP State,但这个 IMO 接近最佳实用功能。请特别注意,这鼓励将域效果(例如围绕工作完成)与实现协议效果(例如回复)分开。


推荐阅读