cqrs - Axon 框架的排序策略如何在状态方面起作用
问题描述
在 Axon 的参考指南中写道
除了这些提供的策略之外,您还可以定义自己的策略。所有策略都必须实现 SequencingPolicy 接口。此接口定义了一个方法 getSequenceIdentifierFor,它返回给定事件的序列标识符。返回相同序列标识符的事件必须按顺序处理。可以同时处理产生不同序列标识符的事件。
更重要的是,在这个线程的最后一条消息中,它说
使用排序策略,您可以指示哪些事件需要按顺序处理。线程是在同一个 JVM 中还是在不同的 JVM 中都没有关系。如果排序策略为 2 条消息返回相同的值,则它们将保证按顺序处理,即使您跨多个 JVM 跟踪处理器线程也是如此。
那么这是否意味着事件处理器实际上是无状态的?如果是,那么他们如何设法同步?令牌存储是否用于此目的?
解决方案
我认为这取决于您将什么视为状态,但我假设从您的角度来看,是的,EventProcessor
Axon 中的实现确实是无状态的。
当事件发生时,它会从(实现此接口)SubscribingEventProcessor
接收它的事件。自己闲暇时从(实现此接口)检索它的事件。SubscribableMessageSource
EventBus
TrackingEventProcessor
StreamableMessageSource
EventStore
后一个版本需要跟踪它在事件流上的事件的位置。此信息存储在 中TrackingToken
,由 保存TokenStore
。一个给定的TrackingEventProcessor
线程只有在对它所属的处理组提出声明时才能处理事件。TrackingToken
因此,这确保了相同的事件不会被两个不同的线程处理以意外更新相同的查询模型。也允许多线程处理这个TrackingToken
过程,这是通过对令牌进行分段来完成的。段数(可通过 调整initialSegmentCount
)驱动给定处理组将被分区的片数。从TrackingToken
的角度来看TokenStore
,这意味着您将有几个TrackingToken
存储的实例等于您设置的段数。
它的SequencingPolicy
工作是驱动流中的哪些事件属于哪个段。这样做,您可以例如使用SequentialPerAggregate
SequencingPolicy
来确保具有给定聚合标识符的所有事件都由一个段处理。
推荐阅读
- android - Android 如何检测按下的自动填充?
- go - 如何理解容量为 C 的通道上的第 k 次接收发生在该通道的第 k+C 次发送完成之前?
- firebase - 错误:参数类型“TextEditingController”无法分配给参数类型“String”。在 FLUTTER
- python - 为什么这个奇怪的执行时间
- javascript - Plotly 工具提示中的自定义数据未显示
- python - 将多个字典存储在列表中并遍历列表
- python - visibility_of_element_located 或 element_to_be_clickable - 最终解决方案?
- video - 无法在 UWP 中使用 MediaPlayerElement 播放 mp4 流
- apache-flink - Flink-Kafka Flink 作业在启动期间读取 kafka 记录并在 AWS-KDA 上启动失败
- google-apps-script - 在谷歌应用程序脚本中获取文件夹名称