apache-kafka - 使用 kafka 和 cassandra 进行事件溯源的类别预测
问题描述
我正在使用 Cassandra 和 Kafka 进行事件溯源,而且效果很好。但我最近刚刚发现了设计/设置中的一个潜在重大缺陷。简要介绍它是如何完成的:
聚合命令处理程序基本上是一个 kafka 消费者,它消费关于某个主题的感兴趣的消息:
1.1 当它接收到一个命令时,它为聚合加载所有事件,并为每个事件重放聚合事件处理程序以使聚合达到当前状态。
1.2 基于命令和业务逻辑,然后将一个或多个事件应用于事件存储。这涉及将新事件插入到 cassandra 中的事件存储表中。事件标记有聚合的版本号 - 从版本 0 开始用于新聚合,使预测成为可能。此外,它将事件发送到另一个主题(用于投影目的)。
1.3 kafka 消费者将在这些事件发布后监听主题。该消费者将充当投影仪。当它接收到感兴趣的事件时,它会加载聚合的当前读取模型。它检查它收到的事件的版本是否是预期的版本,然后更新读取模型。
这似乎工作得很好。问题是当我想要 EventStore 所谓的类别投影时。我们以 Order 聚合为例。我可以轻松地投射一个或多个阅读模型 pr Order。但是,如果我想有一个包含客户 30 个最后订单的投影,那么我需要一个类别投影。
我只是在摸索如何做到这一点。我很想知道是否有其他人正在使用 Cassandra 和 Kafka 进行事件采购。我读过一些人们不鼓励它的地方。也许这就是原因。
我知道 EventStore 支持这个内置的。也许使用 Kafka 作为事件存储会是一个更好的解决方案。
解决方案
使用这种架构,您必须选择:
- 每个类型的全局事件流 - 简单
- 每种类型的分区事件流 - 可扩展
除非您的系统具有相当高的吞吐量(例如,对于所讨论的流类型,持续时间每秒至少 10 秒或 100 秒的事件),否则全局流是更简单的方法。一些系统(例如事件存储)通过非常细粒度的流(例如每个聚合实例)为您提供两全其美的流,但能够将它们组合成更大的流(每个流类型/类别/分区,每个多种流类型等)以开箱即用的高性能和可预测方式,同时仍然很简单,只需要您跟踪单个全局事件位置。
如果您使用 Kafka 进行分区:
- 在处理需要进入相同模型的不同分区的事件时,您的投影代码将需要处理访问相同读取模型的并发消费者组。根据您的投影目标存储,有很多方法可以处理此问题(事务、乐观并发、原子操作等),但这对于某些目标存储来说会是个问题
- 您的投影代码将需要跟踪每个分区的流位置,而不仅仅是单个位置。如果您的投影从多个流中读取,它必须跟踪很多位置。
使用全局流消除了这两个问题——性能通常可能足够好。
在任何一种情况下,您可能还希望将流位置获取到长期事件存储(即 Cassandra)中 - 您可以通过从事件流(分区或全局)读取专用进程并仅更新事件来做到这一点在 Cassandra 中,每个事件的全局或分区位置。(我与 MongoDB 有类似的事情 - 我有一个读取“oplog”并将 oplog 时间戳复制到事件中的过程,因为 oplog 时间戳是完全有序的)。
另一种选择是从初始命令处理中删除 Cassandra 并改用 Kafka Streams:
- 通过与聚合的分区 KTable 连接来处理分区命令流
- 计算命令结果和事件
- 原子地,KTable 使用更改的聚合更新,事件写入事件流,命令响应写入命令响应流。
然后,您将拥有一个下游事件处理器,它将事件复制到 Cassandra 中以便于查询等(并且它可以将 Kafka 流位置添加到每个事件,因为它可以给类别排序)。如果您不想使用 Kafka 进行长期事件存储,这有助于赶上订阅等。(为了赶上进度,您只需尽可能多地阅读 Cassandra 的内容,然后从上一个 Cassandra 事件的位置切换到来自 Kafka 的流式传输)。另一方面,Kafka 本身可以永久存储事件,所以这并不总是必要的。
我希望这有助于理解您可能遇到的权衡和问题。
推荐阅读
- java - org.apache.hadoop.fs.Path 文件可以与 kubernetes 上的 minio 存储桶一起使用
- console - 在 ATEN 串行控制台服务器上配置 UDP 模式
- javascript - Why does this await outside async function work?
- firebase - Daterangepicker firebase android工作室
- javascript - 无法从数组中动态找到对象
- flutter - 如何在 Flutter 中更改 .g.dart 的位置
- html - 为什么我的按钮与列中的弹性框内联?
- mongodb - 如何使用分子框架设置 MongoDB
- amazon-s3 - 由于未启用存储库,AWS EB 部署不起作用
- node.js - 无法读取未定义的属性“findOne””