首页 > 解决方案 > 仅保留最新消息的节流 akka-actor

问题描述

情况

我正在使用 akka 演员来更新我的网络客户端上的数据。其中一个参与者仅负责发送有关 single Agents 的更新。这些代理更新得非常快(每 10 毫秒)。我现在的目标是限制这种更新机制,以便Agent每 300 毫秒发送一次的最新版本。

我的代码

到目前为止,这是我想出的:

/**
  * Single agents are updated very rapidly. To limit the burden on the web-frontend, we throttle the messages here.
  */
class BroadcastSingleAgentActor extends Actor {

    private implicit val ec: ExecutionContextExecutor = context.dispatcher
    private var queue = Set[Agent]()

    context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
        queue.foreach { a =>
            broadcastAgent(self)(a) // sends the message to all connected clients
        }
        queue = Set()
    }

    override def receive: Receive = {
        // this message is received every 10 ms for every agent present
        case BroadcastAgent(agent) => 
           // only keep the newest version of the agent
           queue = queue.filter(_.id != agent.id) + agent
    }

}

问题

这个演员(BroadcastSingleAgentActor)按预期工作,但我不能 100% 确定这是否是线程安全的(更新queue时可能会清除它)。此外,这并不意味着我在充分利用 akka 提供给我的工具。我找到了这篇文章(Throttling Messages in Akka 2),但我的问题是我需要保留最新Agent消息,同时删除它的任何旧版本。有没有类似于我需要的例子?

标签: scalaakkathrottlingakka-actor

解决方案


不,这不是线程安全的,因为通过 的调度ActorSystem将发生在receive. 一种可能的想法是在receive方法内进行调度,因为传入的消息BroadcastSingleAgentActor将按顺序处理。

  override def receive: Receive = {

    case Refresh =>
      context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
        queue.foreach { a =>
          broadcastAgent(self)(a) // sends the message to all connected clients
        }
      }
      queue = Set()
    // this message is received every 10 ms for every agent present
    case BroadcastAgent(agent) =>
      // only keep the newest version of the agent
      queue = queue.filter(_.id != agent.id) + agent
  }

推荐阅读