scala - 在启动主逻辑之前处理 Actor 需求
问题描述
看看下面的图表:
在这种情况下,我们有两个经理。ManagerA 和 ManagerB,都是由同一个父亲同时产生的。ManagerB 还立即生成 3 个子节点,它们开始处理一些数据。当工作人员完成处理并有结果时,他向 ManagerB 发送返回结果。
然后 ManagerB 通知 ManagerA 他已经准备好一个名为 B1 的对象。然后 ManagerA 告诉 A1、A2 和 A3 B1 已准备好并将其传递给他们。但只有 A1 和 A2 需要 B1,所以他们保留它,而 A3 丢弃该消息。A1 有他的所有要求,现在他开始执行他的主要逻辑,因为他有 B1,所以他这样做了。同时A2还需要B2,A3需要B3。
如何在 Scala 中使用 akka actor 来实现这样的逻辑?我很难找到一种方法来以功能性的方式保存需求并在最终满足所有需求时开始执行。
解决方案
我将使用的基本思想是在案例类中对状态进行编码,其中至少具有一组要求和一个映射,其键是要求的(非严格)子集,其值是 B 工作人员的结果,例如对于A1
,我们可能有:
requirements = Set("B1")
received = Map("B1" -> B1Result(...))
那么你的actor的行为就是它的状态的函数:改变状态的消息会改变行为。
更具体地说,您可能会执行以下操作:
object AWorker {
type BResult = Any // BResult would probably defined elsewhere, I'm aliasing Any for brevity
type Action = (ActorContext[Command], Map[String, BResult]) => Unit
sealed trait Command
case class RequirementsAndDo(
requirements: Set[String],
action: Action,
replyTo: Option[ActorRef[Reply]]
) extends Reply
case class Completed(key: String, value: BResult, replyTo: Option[ActorRef[Reply]]) extends Reply
sealed trait Reply
case object Ack extends Reply
def apply(): Behavior[Command] = withState(State())
private[AWorker] case class State(
requirements: Set[String] = Set.empty,
received: Map[String, BResult] = Map.empty,
whenRequirementsMet: Option[Action] = None
) {
def updateRequirements(req: Set[String], f: Option[Action]): State = {
// For now, start anew...
State(req)
}
def completeRequirement(req: String, v: BResult): State =
copy(received = received.updated(req, v))
def checkAndPerform(ctx: ActorContext[Command]): State = {
if (requirements.diff(received.keySet).isEmpty) {
whenRequirementsMet.foreach { a =>
a(ctx, received)
}
State()
} else this
}
private def withState(state: State): Behavior[Command] =
Behaviors.receive { (context, msg) =>
msg match {
case RequirementsAndDo(req, action, replyTo) =>
val nextState = state.updateRequirements(req, action)
replyTo.foreach(_ ! Ack)
withState(nextState)
case Completed(key, value, replyTo) =>
val intermediateState = state.completeRequirement(key, value)
replyTo.foreach(_ ! Ack)
withState(intermediateState.checkAndPerform(context))
}
}
}
您可以通过让方法返回一组效果和新状态来使其更加 FP State
,但这个 IMO 接近最佳实用功能。请特别注意,这鼓励将域效果(例如围绕工作完成)与实现协议效果(例如回复)分开。
推荐阅读
- python - 如何使用 CreateView?django.template.exceptions.TemplateDoesNotExist:django/forms/widgets/text.html
- linux - 在匹配模式之前打印单词
- javascript - 如何分解并一次仅显示三个变体中的一个?
- laravel - 在 Laravel 资源中使用 groupBy
- api - Go api 在同事的计算机上工作,端点在我的计算机上返回 404?
- r - 故障排除警告消息,条件长度 > 1,仅使用第一个元素
- php - 1970 年 1 月 1 日的时间戳变为 -3600
- python - NGINX 和 Docker 中的 Gunicorn 中的权限错误:connect() 到 unix:/tmp/gunicorn.sock 在连接到上游时失败(13:权限被拒绝)
- c# - 在 JSON 响应中访问数组的问题
- c++ - C++中二维数组的大小