node.js - Nodejs ts:事件溯源和 cqrs、事件总线
问题描述
你好我有一个命令总线,一个查询总线,它基本上有一个带有命令或查询名称和处理程序的密钥对,然后我执行应该发布我的事件的命令。但我对如何做我的事件总线有些怀疑。命令总线是事件总线的一部分吗?我怎么能用处理程序做一个事件总线
命令总线:
export interface ICommand {
}
export interface ICommandHandler<
TCommand extends ICommand = any,
TResult = any
> {
execute(command: TCommand): Promise<TResult>
}
export interface ICommandBus<CommandBase extends ICommand = ICommand> {
execute<T extends CommandBase>(command: T): Promise<any>
register(data:{commandHandler: ICommandHandler, command: ICommand}[]): void
}
命令总线实现:
export class CommandBus<Command extends ICommand = ICommand>
implements ICommandBus<Command> {
private handlers = new Map<string, ICommandHandler<Command>>()
public execute<T extends Command>(command: T): Promise<any> {
const commandName = this.getCommandName(command as any)
const handler = this.handlers.get(commandName)
if (!handler) throw new Error(``)
return handler.execute(command)
}
public register(
data: { commandHandler: ICommandHandler; command: ICommand }[],
): void {
data.forEach(({command,commandHandler}) => {
this.bind(commandHandler, this.getCommandName(command as any))
})
}
private bind<T extends Command>(handler: ICommandHandler<T>, name: string) {
this.handlers.set(name, handler)
}
private getCommandName(command: Function): string {
const { constructor } = Object.getPrototypeOf(command)
return constructor.name as string
}
}
这里出现了另一个问题,谁应该负责在我的事件数据库中发布事件或读取我的事件数据库的流是我的类事件存储?
事件存储类:
export class EventStoreClient {
[x: string]: any;
/**
* @constructor
*/
constructor(private readonly config: TCPConfig) {
this.type = 'event-store';
this.eventFactory = new EventFactory();
this.connect();
}
connect() {
this.client = new TCPClient(this.config);
return this;
}
getClient() {
return this.client;
}
newEvent(name: any, payload: any) {
return this.eventFactory.newEvent(name, payload);
}
close() {
this.client.close();
return this;
}
}
然后我怀疑如何使用我的事件处理程序和我的事件来实现我的事件总线。
如果有人可以帮助我,我会很高兴..
事件接口:
export interface IEvent {
readonly aggregrateVersion: number
readonly aggregateId: string
}
export interface IEventHandler<T extends IEvent = any> {
handle(event: T): any
}
也许用法:
commandBus.execute(new Command())
class commandHandler {
constructor(repository: IRepository, eventBus ????){}
execute(){
//how i can publish an event with after command handler logic with event bus her
}
}
解决方案
我发现各种总线和 Event Store 之间存在一些混淆。在尝试实现事件总线之前,您需要回答一个重要问题,它是任何事件溯源实现的基础:
- 如何将事件存储保存为唯一的真相来源?
也就是说,您的事件存储包含域的完整状态。这也意味着事件总线的消费者(无论它最终是什么——消息队列、流平台、Redis 等)应该只获取持久化的事件。因此,目标变为:
- 仅在 Bus 上传递持久保存到 Store 的事件(因此,如果您在写入 Store 时遇到错误,或者可能是并发异常,请不要通过总线传递!)
- 将所有事件传递给所有感兴趣的消费者,而不会丢失任何事件
这两个目标直观地转化为“我想要事件存储和事件总线之间的原子提交”。当它们是相同的东西时,这是最容易实现的!
因此,与其考虑如何将“事件总线”连接到命令处理程序并来回发送事件,不如考虑如何从事件存储中检索已经持久化的事件并订阅它。这也消除了命令处理程序和事件订阅者之间的任何依赖关系——它们位于事件存储的不同端(写入器与读取器),并且可能在不同的进程中,在不同的机器上。
推荐阅读
- c# - 来自多维数组c#的多维类数组
- c - 如何在 C 中使用 ctrlz 终止后台进程?
- linux - 为什么某些内核配置没有显示在 .config 中
- haskell - 使用 Barbies 或 Higgledy 将应用效果推向 HKD 领域
- html - 仅适用于 max-height CSS 的更好动画
- java - 使用 Spring Boot 和 Jackson ObjectMapper 时 OffsetDateTime 属性 JSON 序列化的区别
- amazon-route53 - 弹性 IP 与 AWS EC2 实例一起使用,但 Route 53 无法解析域名
- python - 无法在 django 项目中的模板文件夹上呈现 html 页面?
- go - 在 html 代码 Goland IDE 中禁用 sql 语法高亮
- csv - 如何在颤振中打开 CSV 文件?