cqrs - 如何将“EventStore”中的聚合/读取模型保存在数据库中?
问题描述
第一次尝试实现事件溯源和 CQRS,但在持久化聚合时遇到了困难。
这就是我现在的位置
- 我已经设置了“EventStore”一个流,“foos”
- 从连接到它
node-eventstore-client
- 我订阅了追赶事件
这一切都很好。
在eventAppeared
事件处理函数的帮助下,我可以在事件发生时构建聚合。这很棒,但我该怎么办?
假设我构建并聚合了一个 Foos 列表
[
{
id: 'some aggregate uuidv5 made from barId and bazId',
barId: 'qwe',
bazId: 'rty',
isActive: true,
history: [
{
id: 'some event uuid',
data: {
isActive: true,
},
timestamp: 123456788,
eventType: 'IsActiveUpdated'
}
{
id: 'some event uuid',
data: {
barId: 'qwe',
bazId: 'rty',
},
timestamp: 123456789,
eventType: 'FooCreated'
}
]
}
]
为了遵循 CQRS,我将在读取模型中构建上述聚合,对吗?但是如何将这个聚合存储在数据库中?
我想只需要一个 nosql 数据库就可以了,但我肯定需要一个 db,因为我会在这个和其他读取模型/聚合之前放置一个 gRPC APi。
但是,从构建聚合到何时将其持久保存在数据库中,我实际上该怎么做?
我曾经尝试按照本教程https://blog.insiderattack.net/implementing-event-sourcing-and-cqrs-pattern-with-mongodb-66991e7b72be进行操作,这非常简单,因为您将 mongodb 用作事件存储和只需为聚合创建一个视图并在新事件传入时更新该视图。它有它的缺陷和限制(聚合管道),这就是为什么我现在将事件存储部分转向“EventStore”。
但是如何持久化聚合,它目前只是从“EventStore”中的事件构建并存储在代码/内存中......?
我觉得这可能是一个愚蠢的问题,但我是否必须遍历数组中的每个项目并将每个项目插入 db 表/集合中,或者你是否有办法一次将整个数组/聚合转储到那里?
之后会发生什么?您是否为每个聚合创建一个物化视图并针对它进行查询?
我愿意为此选择最好的数据库,无论是 postgres/其他 rdbms、mongodb、cassandra、redis、表存储等。
最后一个问题。现在我只使用一个流“foos”,但在这个级别上,我希望新事件会非常频繁地发生(每隔几秒左右),但据我所知,你仍然会坚持它并使用物化来更新它意见对吗?
因此,鉴于 barId 和 bazId 组合可用于对事件进行分组,而不是单个流,我认为更专业的流(如 foos-barId-bazId)将是可行的方法,尝试减少传入新的频率事件到重新创建物化视图才有意义的地步。
如果更新频率低于某个限制,是否有一般的经验法则说不要重新创建/更新/刷新物化视图?那么唯一的另一种选择是从普通表/集合中查询?
编辑:
最后,我正在尝试制作一个只有 2 个 RPC 的 gRPC api——一个用于通过 id 获取单个 foo,一个用于获取所有 foo(具有用于按状态过滤的可选字段——但这并不重要)。简化的原型看起来像这样:
rpc GetFoo(FooRequest) returns (Foo)
rpc GetFoos(FoosRequest) returns (FooResponse)
message FooRequest {
string id = 1; // uuid
}
// If the optional status field is not specified, return all foos
message FoosRequest {
// If this field is specified only return the Foos that has isActive true or false
FooStatus status = 1;
enum FooStatus {
UNKNOWN = 0;
ACTIVE = 1;
INACTIVE = 2;
}
}
message FoosResponse {
repeated Foo foos;
}
message Foo {
string id = 1; // uuid
string bar_id = 2 // uuid
string baz_id = 3 // uuid
boolean is_active = 4;
repeated Event history = 5;
google.protobuf.Timestamp last_updated = 6;
}
message Event {
string id = 1; // uuid
google.protobuf.Any data = 2;
google.protobuf.Timestamp timestamp = 3;
string eventType = 4;
}
传入的事件看起来像这样:
{
id: 'some event uuid',
barId: 'qwe',
bazId: 'rty',
timestamp: 123456789,
eventType: 'FooCreated'
}
{
id: 'some event uuid',
isActive: true,
timestamp: 123456788,
eventType: 'IsActiveUpdated'
}
正如您所看到的,在 gRPC API 中没有 uuid 可以使 GetFoo(uuid) 成为可能,这就是为什么我将生成一个带有 barId 和 bazId 的 uuidv5,它们将组合为一个有效的 uuid。我在您在上面看到的投影/聚合中进行此操作。
此外,GetFoos rpc 将返回所有 foo(如果未定义状态字段),或者返回具有与状态字段匹配的 isActive 的 foo(如果指定)。
但是我不知道如何从追赶订阅处理程序继续。
我将事件存储在“EventStore”(https://eventstore.com/)中,使用带有追赶的订阅,我已经建立了一个聚合/投影,其中包含我想要的形式的 Foo 数组,但是能够要从我的 gRPC API 获取单个 Foo,我想我需要将整个聚合/投影存储在某种数据库中,以便我可以连接并从 gRPC API 获取数据?每次有新事件出现时,我都需要将该事件添加到数据库中,或者这是如何工作的?
我想我已经阅读了我可能在互联网上找到的所有资源,但我仍然缺少一些关键信息来解决这个问题。
gRPC 不是那么重要。我猜它可能是 REST,但我最大的问题是如何让 API 服务可以使用聚合/投影的数据(可能更多的 API 也需要它)?我想我需要将聚合/投影数据与生成的 uuid 和历史字段一起存储在数据库中,以便能够通过 uuid 从 API 服务中获取它,但是从追赶事件中,什么数据库以及这个存储过程是如何完成的我在哪里构建聚合的处理程序?
解决方案
我知道你的感受!这基本上是我第一次尝试做 CQRS 和 ES 时发生的事情。
我认为您的知识存在一些空白,我相信您会迅速填补这些空白。您可以在执行过程中对事件流中的聚合进行水合。那是您的聚合持续存在。读取模型是不同的。让我解释...
例如,您的读取模型是用于运行查询并提供数据以显示给 UI 的东西。您的聚合不(直接)参与其中。事实上,它们应该被封装。这意味着您无法从外部“看到”他们的状态。即除了具有getter 的聚合ID 之外,没有getter 和setter。
本文为您提供了有关如何将它们组合在一起的有用概述:CQRS + 事件溯源 - 一步一步
这个想法是,当聚合更改状态时,它只能通过它生成的事件来做到这一点。您将该事件存储在事件存储中。该事件也被发布,以便可以更新读取模型。
还查看您的聚合,它看起来更像是典型的读取模型对象或 DTO。聚合对功能感兴趣,而不是属性。因此,您会看到用于向聚合发出命令的无效公共函数。但不是像 isActive 或 history 这样的公共属性。
我希望这是有道理的。
编辑:
这里有一些更实用的建议。
“为了遵循 CQRS,我将在读取模型中构建上述聚合,对吗?”您不会在读取模型中构建聚合。它们是等式 CQRS 侧的不同侧的独立事物。聚合在命令端。针对不同于聚合的读取模型进行查询。
聚合具有公共 void 函数,没有 getter 或 setter(聚合 id 除外)。它们被封装。当它们的状态因发出命令而改变时,它们会生成事件。这些事件存储在事件存储中,用于恢复聚合的状态。换句话说,这就是聚合的存储方式。
事件继续被发布,因此事件处理程序和其他进程可以对它们做出反应并更新读取模型和/或触发新的级联命令。
“最后一个问题。现在我只使用一个流“foos”,但在这个级别上,我希望新事件会非常频繁地发生(每隔几秒左右),但据我所知,你仍然会坚持下去并且使用物化视图更新它,对吗?”
每隔几秒钟就很可能没问题。我更关心使用物化视图的持久化和更新。我不知道你的意思是什么,但听起来你的想法不正确。视图应该是非常简单的读取模型。无需像您在 RDMS 中找到的那样复杂的关系。因此针对阅读进行了高度优化。
推荐阅读
- android - 奇怪的错误配置“编译”已过时并已替换为“实现”和“API”
- javascript - 检查元素是否具有 href 如果它不应该为空 Cypress.io
- javascript - AmCharts 4 - 如何在 MapPolygonSeries() 中使用自定义 ajax 响应
- java - 升级到 spring-boot-2 后,SimpMessageHeaderAccessor#getUser 可以为空。怎么处理?
- reactjs - 我可以在 create-react-app v2 中使用 css 模块覆盖 Material-UI 吗?
- python-3.x - 为什么正则化强度负值不是正确的方法?
- c# - 通过变量传递图像 URL 时,Crystal Report 不显示动态图像
- excel - 如何在 excel 中使用高级过滤器来排除过滤的数据而不是包含它
- python - 通过其他键将列添加到具有非唯一 ID 的 pyspark 数据帧
- php - 自动将对象添加到数组