首页 > 解决方案 > 使用 akka 持久性进行批量操作,有哪些选择?

问题描述

假设我正在使用 akka 持久性,并且我支持像用户这样的东西。

如果有一项任务需要扫描所有用户,以及任何已过期的用户将其对象标记为已过期。

在使用 sql 的更传统的设置中,您只需执行以下操作:

update u
  set u.is_expired=1
from users u
where u.expired_at >= getdate()

现在,如果您这样做,您的 akka 持久性将不同步,您将不得不以某种方式向所有参与者广播以重新加载。

或者您必须向所有演员发送广播以检查您是否已过期。

如果您有数百万用户,您有哪些现实的选择?如果这是一个数据库存储过程,这种类型的查询可以在几秒钟内完成。

试图了解如何使用 akka 和 akka-persistance 来完成此操作。

标签: scalaakkaakka-persistence

解决方案


使用 Akka Persistence 有两种容易获得的方法来做这样的事情。两者都利用Persistence Query来查询事件流。

如果实体数量很少,您可以使用currentPersistenceIds()查询(我知道的所有持久性实现都支持此查询)来获取当时存在的实体流(流限制和背压可能会派上用场此处)并向每个实体的关联持久性参与者发送命令以检查是否过期。

在某个时间点之后,拥有一个单独的数据库来维护将实体 ID 映射到到期时间的实体的视图可能是有意义的。为此,您可能会使用eventsByTag查询来获取标记为的事件流,例如"affects-expiration"; 流中的稍后阶段然后更新该数据库。然后批处理作业可以查询该数据库并发出过期命令。

DB 的替代方案是拥有一个持久性参与者,它维护一组未过期的实体及其过期时间。该参与者可以是单例,也可以以能够一致地确定哪个特定参与者将维护给定实体的到期时间的方式进行分片。它可以通过eventsByTag流(最终是一致的)或由实体参与者自己更新(更加一致,但一般要注意不要有比您需要的更多的一致性)。


推荐阅读