首页 > 解决方案 > Aggregates with Event Sourcing and CQRS

问题描述

In order to process large amounts of telemetry data and still be able to perform quick queries on the data, I'm adopting the Event Sourcing / CQRS patterns using Azure Functions and Azure Cosmos DB.

In my architecture, the inbound telemetry stream gets stored in a Cosmos DB Collection acting as event store.

To create materialized views of the raw telemetry data, I use another Azure Function with Cosmos DB Trigger which gets active on all new documents stored in my event store performing transformations on those documents.

This is quite easy working on a document per document basis. Where it get's tricky is, when I need to reference other documents in order to calculate my materialized view.

For example, when the received telemetry events contain relative counter values (e.g. energy used in a particular operation). In my materialized view I want to have a document containing a total sum of all energy consumption.

Now an easy implementation would be to look at the current state of this document in my materialized view and just increment this value by the newly received value.

The problem which i could get using this approach is when i have to recalculate my materialized views because in a future version i need to generate some additional views.

For recalculation, I would simply touch all related documents I want to recalculate in my event store, triggering the Azure Function which calculates the materialized views again. This would result in documents entering this Azure Function which were processed before.

When recalculation occurs, my counter would now not be accurate anymore if I simply increment my sum as documents that are already part of the sum would get added again.

Ways to solve this recalculation-scenario (i thought of) would be:

Could you give me some advice on how to properly solve that kind of scenarios?

标签: azure-functionsazure-cosmosdbcqrsevent-sourcingmaterialized-views

解决方案


因此,总结@Mikhail 和@RomanEremin 的评论以及我的想法,这将是处理这些情况的方法:

在重新计算视图的情况下:

  • 删除现有聚合并从头开始构建,重放事件存储中的事件。

如果事件总线提供“至少一次”(Azure 函数与 CosmosDb 触发器的行为方式是底层 ChangeFeedProcessor 的结果):

  • 版本 1:跟踪作为聚合文档中聚合的一部分的事件 ID(文档 ID),并忽略已经是聚合一部分的事件。
  • 版本 2:提供源事件的顺序版本(序列号),并将聚合所基于的版本存储在聚合文档中。计算聚合时,请根据事件的序列号检查此序列号。如果事件的序列号低于聚合文档:忽略,否则重新计算聚合并更新聚合所基于的序列号。

推荐阅读