首页 > 解决方案 > Akka:在邮箱中保留不匹配的消息

问题描述

我熟悉 Erlang/Elixir,其中进程邮箱中的消息保留在邮箱中,直到它们匹配:

这些模式Pattern按时间顺序与邮箱中的第一条消息顺序匹配,然后是第二条,依此类推。如果匹配成功并且可选的保护序列GuardSeq为真,Body则评估对应的。匹配的消息被消费,即从邮箱中删除,而邮箱中的任何其他消息保持不变。

http://erlang.org/doc/reference_manual/expressions.html#receive

但是,对于 Akka Actor,不匹配的消息会从邮箱中删除。例如,在哲学家就餐模拟中实现分叉时,这很烦人:

import akka.actor._

object Fork {
  def props(id: Int): Props = Props(new Fork(id))
  final case class Take(philosopher: Int)
  final case class Release(philosopher: Int)
  final case class TookFork(fork: Int)
  final case class ReleasedFork(fork: Int)
}

class Fork(val id: Int) extends Actor {
  import Fork._

  object Status extends Enumeration {
    val FREE, TAKEN = Value
  }

  private var _status: Status.Value = Status.FREE
  private var _held_by: Int = -1

  def receive = {
    case Take(philosopher) if _status == Status.FREE => {
      println(s"\tPhilosopher $philosopher takes fork $id.")
      take(philosopher)
      sender() ! TookFork(id)
      context.become(taken, false)
    }
    case Release(philosopher) if _status == Status.TAKEN && _held_by == philosopher => {
      println(s"\tPhilosopher $philosopher puts down fork $id.")
      release()
      sender() ! ReleasedFork(id)
      context.unbecome()
    }
  }

  def take(philosopher: Int) = {
    _status  = Status.TAKEN
    _held_by = philosopher
  }

  def release() = {
    _status  = Status.FREE
    _held_by = -1
  }
}

当一条Take(<philosopher>)消息被发送到分叉时,我们希望消息一直留在邮箱中,直到分叉被释放并且消息被匹配。但是,在 AkkaTake(<philosopher>)中,如果当前采用分叉,则从邮箱中删除消息,因为没有匹配项。

目前,我通过重写unhandledFork actor 的方法并将消息再次转发到 fork 来解决这个问题:

override def unhandled(message: Any): Unit = {
  self forward message
}

我相信这是非常低效的,因为它会一直将消息发送到分叉,直到匹配为止。有没有另一种方法来解决这个问题,不涉及不断转发不匹配的消息?

我相信在最坏的情况下,我将不得不实现一个模仿 Erlang 邮箱的自定义邮箱类型,如下所述:http: //ndpar.blogspot.com/2010/11/erlang-explained-selective-receive.html


编辑:我根据 Tim 的建议修改了我的实现,并按照建议使用了 Stash 特征。我的Fork演员现在看起来如下:

class Fork(val id: Int) extends Actor with Stash {
  import Fork._

  // Fork is in "taken" state
  def taken(philosopher: Int): Receive = {
    case Release(`philosopher`) => {
      println(s"\tPhilosopher $philosopher puts down fork $id.")
      sender() ! ReleasedFork(id)
      unstashAll()
      context.unbecome()
    }
    case Take(_) => stash()
  }

  // Fork is in "free" state
  def receive = {
    case Take(philosopher) => {
      println(s"\tPhilosopher $philosopher takes fork $id.")
      sender() ! TookFork(id)
      context.become(taken(philosopher), false)
    }
  }
}

但是,我不想到处写stash()andunstashAll()调用。相反,我想实现一个自定义邮箱类型来为我执行此操作,即存储未处理的消息并在参与者处理消息时取消存储它们。这可能吗?

我尝试实现一个自定义邮箱来执行此操作,但是,我无法确定消息是否与接收块匹配。

标签: scalaakkaakka-actor

解决方案


问题forward在于,如果有多个消息等待处理,它可能会重新排序消息,这可能不是一个好主意。

这里最好的解决方案似乎是在提供你想要的语义的actor内部实现你自己的队列。如果您不能立即处理消息,则将其放入队列中,当下一条消息到达时,您可以尽可能多地处理队列。这也将允许您检测发件人何时提供不一致的消息(例如Release,在他们没有的分叉上Take),否则只会在传入邮箱中建立。

在您证明这是一个问题之前,我不会担心效率,但如果每个接收函数只处理与该特定状态相关的消息,效率会更高。


我会var通过将状态放入方法的参数中来避免在actor中receive使用。并且该_status值隐含在接收处理程序的选择中,不需要存储为值。taken接收处理程序只需要处理消息,主Release接收处理程序只需要处理Take消息。


推荐阅读