cqrs - Axon 框架:仅处理由同一 JVM 实例发布的事件?
问题描述
嗨 Axon 框架社区,
我想就如何正确解决以下问题征求您的意见。
我的轴突测试设置
- 同一个 Spring Boot 应用程序的两个实例(使用 axon-spring-boot-starter 4.4 没有 Axon Server)
- 每个实例都会定期发布相同的事件
- 两个实例都连接到同一个 EventSource(使用 JpaEventStorageEngine 的单个 SQL Server 实例)
- 每个实例都配置为使用 TrackingEventProcessors
- 每个实例都注册了相同的事件处理程序
我想要达到的目标
我希望一个实例发布的事件仅由同一个实例处理
如果 instance1 发布 eventX,那么只有 instance1 应该处理 eventX
到目前为止我尝试过的
- 我可以使用 SubscribingEventProcessor 实现上述场景。不幸的是,这不是我的选择,因为我们希望可以选择重播事件以重建/添加新的查询模型。
- 我可以将每个实例的事件处理程序分配给不同的处理组。不幸的是,这没有奏效。也许是因为每个 TrackingEventProcessors 实例都处理相同的 EventStream ?- 虽然不太确定。
- 我可以实现一个 MessageHandlerInterceptor ,它只在事件来源来自同一个实例的情况下才会继续。这是我到目前为止实现的并且可以正常工作的内容: MessageHandlerInterceptor
class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {
override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
if(stackId == stackProperties.id){
interceptorChain?.proceed()
}
return null
}
}
@Configuration
class AxonConfiguration {
@Autowired
fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
val processingGroup = "processing-group-stack-${stackProperties.id}"
eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
}
}
有更好的解决方案吗?
我的印象是我当前的解决方案并不是最好的,因为理想情况下,我希望只有属于某个实例的事件处理程序由 TrackingEventProcessor 实例触发。
你会怎么解决?
解决方案
您在这里遇到的有趣场景@thowimmer。我的第一个预感是说“SubscribingEventProcessor
改用”。但是,您指出这不是您的设置中的一个选项。我认为对于处于相同情况的其他人来说,知道为什么这不是一种选择是非常有价值的。所以,也许你可以详细说明一下(说实话,我也很好奇)。
现在为您的问题案例确保事件仅在同一个 JVM 中处理。将来源添加到事件中绝对是您可以采取的一个步骤,因为这允许以逻辑方式进行过滤。“这个事件起源于my.origin()
?” 如果没有,您只需忽略该事件并完成它,就这么简单。不过,还有另一种方法可以实现这一点,我稍后会谈到。
但是,我认为过滤的地方主要是您要寻找的地方。但首先,我想先说明 为什么需要过滤。正如您所注意到的,TrackingEventProcessor
(TEP)从所谓的StreamableMessageSource
. 这EventStore
是一个这样的实现StreamableMessageSource
。当您将所有事件存储在同一个存储中时,它只会将所有内容流式传输到您的 TEP。由于您的事件是单个事件流的一部分,因此您需要在某个阶段过滤它们。使用 aMessageHandlerInterceptor
可以工作,您甚至可以编写 aHandlerEnhacnerDefinition
允许您向事件处理函数添加额外的行为。但是,尽管您这么说,但在当前设置下,需要在某处进行过滤。可以说是最MessageHandlerInterceptor
简单的地方。
但是,有一种不同的方法来处理这个问题。为什么不将您的 Event Store 分离为两个应用程序的不同实例?显然他们不需要互相读取,那么为什么要共享同一个事件存储呢?在不了解您的领域的进一步背景的情况下,我猜您本质上是在处理驻留在不同有界上下文中的应用程序。简而言之,与应用程序/上下文共享所有内容的兴趣为零,您只需非常有意识地彼此共享您的领域语言的特定部分。
请注意,支持多个上下文,在中间使用单个通信集线器,这正是Axon Server 可以为您实现的。我不是在这里说你不能自己配置这个,我过去做过。但是将这项工作留给某人或其他人,让您无需配置基础设施,这将节省大量时间。
希望这可以帮助您设置我对此事@thowimmer 的一些想法。
推荐阅读
- android - Firebase Crashlytics SDK 无法识别 Crashlytics 类
- sh - 当为 sh -c 和 su 传入字符串时,如何让管道工作
- flutter - 如何映射整个文档子集合的数据并使其成为 Flutter 中的流?
- python - 使用 Python 将编写的代码应用于不同文件夹中的所有 CSV 文件
- scala - 反序列化选项返回 None 而不是 json4s 中的异常
- c - 如何使用 Ethernet Shield 将数据从 Arduino Uno 发送到 MySQL 数据库
- laravel - Laravel 刀片:第一次加载页面时获取未定义的变量,刷新时它工作正常
- django - 通过传递字段名称而不是主 ID 获取模型实例
- php - 如何用 PHP 替换 SQL Server 显示的输出
- presto - Presto:如何填补空白并从前几行复制值