microservices - CQRS - 乱序消息
问题描述
假设我们有 3 个不同的服务产生事件,每个服务都发布到自己的事件存储。
这些服务中的每一个都使用其他生产者服务事件。这是因为每个服务都必须处理另一个服务的事件并创建自己的投影。每个服务都在多个实例上运行。
(对我来说)最直接的方法是在每个 ES 前面放置“一些东西”,这些 ES 正在挑选事件并将它们发布(发布/订阅)到所有其他服务的队列中。
这是完美的,因为每个服务都可以订阅它喜欢的每个主题,而事件发布者正在做这项工作,如果服务不可用,事件仍然会被传递。在我看来,这可以保证高可扩展性和可用性。
我的问题是队列。我无法获得一个易于扩展的队列来保证消息的排序。它实际上保证了至少一次交付的“轻微故障”:明确地说,它是 AWS SQS。
所以,排序问题是:
- 不能保证来自同一事件流的事件的顺序。
- 来自同一 ES 的事件不保证顺序。
- 来自不同 ES(不同服务)的事件不保证顺序。
我虽然可以通过跟踪来自同一 ES 的事件的“序列号”来解决前两个问题。这将通过跟踪我们从中消费事件的每个主题的最后一个序列号来完成。这应该很容易对事件做出反应并构建我们的投影。然后,当我从队列中弹出一个事件时,如果eventSequenceNumber > previousAppliedEventSequenceNumber + 1
我将它排入队列(或使其在一段时间内不可见)。
但事实证明,使用这个解决方案,当事件以高速率产生时,它会破坏性能(我可以使用可见性超时或其他东西,结果应该是一样的)。
这是因为当我期待事件 10 并且我暂时忽略事件 11 时,我也应该忽略所有事件(来自 ES),序列号在事件 11 之后,直到事件 11 再次出现并且它被有效处理。
其他困难是:
- 在哪里跟踪事件的序列号以构建投影。
- 如何跟踪事件的序列号以构建投影,以便在应用它时,我有一个一致的
lastSequenceNumber
.
我错过了什么?
PS:对于第三个问题,请考虑以下情况。我们有一个UserService
和一个CartService
。有CartService
一个投影,每个用户都可以跟踪购物车中的产品。每个购物车的投影还必须包含用户名和其他信息,这些信息来自UserCreated
从UserService
. 如果UserCreated
在ProductAddedToCart
正常流程之后需要抛出异常,因为用户还不存在。
解决方案
我错过了什么?
您缺少流程——消费者从源中提取消息,而不是让源将消息推送给消费者。
当我醒来时,我会检查我的书签以找出我最后阅读了您的哪些消息,然后询问您从那以后是否有任何消息。如果有,我会按顺序从您那里检索它们(想想“文档消息”),同时记下新的书签。然后我回去睡觉。
推送通知的主要目的是中断睡眠期(从而减少延迟)。
将 SQS 用作队列,其想法是您一次读取所有排队的消息。如果没有间隙,那么您可以订购集合,然后开始处理它们并确认它们。如果存在间隙,您要么等待(将消息留在队列中),要么前往事件存储区获取丢失消息的副本。
没有什么神奇的——如果消息管道承诺“至少一次”传递,那么消费者必须采取措施在重复消息到达时识别它们。
如果 UserCreated 出现在 ProductAddedToCart 之后,则正常流程需要引发异常,因为用户还不存在。
审查种族条件不存在,作者 Udi Dahan:“时间上的微秒差异不应该对核心业务行为产生影响。”
推荐阅读
- angular - 无法在 Angular 单元测试中为反应形式设置 Formgroup 的“有效”属性
- spring-batch - 如何使用 Spring Batch 将源数据库表中的所有记录更新到目标数据库表?
- sql - 对带有空格的列进行 CQL 查询
- xcode - 使用外部构建在 Xcode 10 中设置 fortran 项目
- node.js - AssertionError [ERR_ASSERTION]:表达式评估为假值:
- c++ - C++ 服务器/Web 客户端,仅运行初始消息
- android - Android Chrome Webview 无法访问 Google Pay
- c# - 为什么 CloudBlobClient 不包含 GetBlockBlobReference 的定义?
- java - 还有另一种方法来加载吗?
- python - 手写文字识别