首页 > 解决方案 > 使用 RailsEventStore 仅订阅单个流

问题描述

我打算使用 RailsEventStore 将持久读取模型投影到关系数据库中。

为了使这成为可能,我需要一个具有固定顺序且没有重复事件的输入流。因此,我构建了一个链接器,它监听所有事件并将那些与我的投影相关的事件链接到一个单独的流中。

现在我想为给定的读取模型注册一个构建器作为该流的订阅者。但我找不到订阅特定流的方法。

这甚至可能吗?如果是这样,怎么办?

标签: ruby-on-railsrubycqrsevent-sourcingrails-event-store

解决方案


无法从选定的流中订阅事件(但是这个想法似乎很有趣)。您需要实现构建器以从给定的流中读取并定期处理事件。它需要存储最后处理的事件 id,下次运行时它应该只读取链接到流的新域事件。

(伪)代码可能如下所示:

class Builder
  def initialize(event_store)
    @event_store = event_store
    @last_processed_event_id = read_last_processed_event
  end

  def call(stream)
    events = read_all_since(stream, event_id)
    process(events)
    store_last_processed_event
  end

  private
  def read_all_since(stream, id)
    @event_store.read.stream(stream).from(id).to_a
  end

  def process(events)
    events.each do |event|
      #... do sth here
      @last_processed_event_id = event.event_id
    end
  end

  def load_last_processed_event
    # read last processed event or return nil to start from beginning of stream
  end

  def store_last_processed_event
    # store it somewhere
  end
end

推荐阅读