scala - 仅保留最新消息的节流 akka-actor
问题描述
情况
我正在使用 akka 演员来更新我的网络客户端上的数据。其中一个参与者仅负责发送有关 single Agent
s 的更新。这些代理更新得非常快(每 10 毫秒)。我现在的目标是限制这种更新机制,以便Agent
每 300 毫秒发送一次的最新版本。
我的代码
到目前为止,这是我想出的:
/**
* Single agents are updated very rapidly. To limit the burden on the web-frontend, we throttle the messages here.
*/
class BroadcastSingleAgentActor extends Actor {
private implicit val ec: ExecutionContextExecutor = context.dispatcher
private var queue = Set[Agent]()
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
queue = Set()
}
override def receive: Receive = {
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}
}
问题
这个演员(BroadcastSingleAgentActor
)按预期工作,但我不能 100% 确定这是否是线程安全的(更新queue
时可能会清除它)。此外,这并不意味着我在充分利用 akka 提供给我的工具。我找到了这篇文章(Throttling Messages in Akka 2),但我的问题是我需要保留最新Agent
消息,同时删除它的任何旧版本。有没有类似于我需要的例子?
解决方案
不,这不是线程安全的,因为通过 的调度ActorSystem
将发生在receive
. 一种可能的想法是在receive
方法内进行调度,因为传入的消息BroadcastSingleAgentActor
将按顺序处理。
override def receive: Receive = {
case Refresh =>
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
}
queue = Set()
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}
推荐阅读
- javascript - Firestore 功能触发器不会触发
- r - 将行与 R 中的两个匹配列合并
- java - Jersey Spring Boot 添加自定义响应头
- javascript - str.link() ,无法读取未定义错误的属性“链接”
- powerbi - PowerBI 计算组中具有值的所有行
- marklogic - MarkLogic - XQuery 语句的顺序处理
- symfony - 在 Symfony 4.1 中定义合成服务在捆绑包中不起作用
- c# - malloc:在 JSON 反序列化期间未分配对象指针被释放的 *** 错误
- kubernetes - 在没有负载均衡器的情况下,在 Digital Ocean 的托管 Kubernetes 上公开端口 80
- ruby-on-rails - 如何验证模型中的条件?