首页 > 解决方案 > 重新启动应用程序时事件处理程序重播?

问题描述

我在本地玩轴突服务器。我正在通过命令在我的本地机器上运行一个 docker 容器docker run -d --name axonserver -p 8024:8024 -p 8124:8124 axoniq/axonserver

当我启动我的 spring-boot 应用程序时,聚合外部的事件处理程序会重新运行所有以前的事件,因此我会在启动时看到这个日志语句流。

同一事件处理程序还将命令发布到聚合,然后聚合将新事件附加到处理命令的聚合。因此,每次我重新启动应用程序时,我的聚合最终都会在其末尾添加一些事件,而我只想要或期望一个事件。

flsh.axon.LetterSchedulingHandler   : Sending letter 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f...
flsh.axon.LetterSchedulingHandler   : Sending letter 6b4f6966-85ea-46e0-9c49-21bcd501a1b5...
flsh.axon.LetterSchedulingHandler   : Sending letter fc36292f-c7bd-4575-b56f-130624a87466...

flsh.axon.Letter                    : LetterScheduledEvent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SCHEDULED
flsh.axon.Letter                    : LetterScheduledEvent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SCHEDULED
flsh.axon.Letter                    : LetterScheduledEvent fc36292f-c7bd-4575-b56f-130624a87466 SCHEDULED
flsh.axon.Letter                    : Letter sent fc36292f-c7bd-4575-b56f-130624a87466 SENT
flsh.axon.Letter                    : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter                    : Letter sent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SENT
flsh.axon.Letter                    : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter                    : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter                    : Letter sent fc36292f-c7bd-4575-b56f-130624a87466 SENT
flsh.axon.Letter                    : Letter sent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SENT

我的事件处理程序如下所示:

@Slf4j
@Component
public class LetterSchedulingHandler {

    private final CommandGateway commandGateway;

    public LetterSchedulingHandler(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }

    @DisallowReplay //this doesn't seem to work
    @EventHandler
    public void handle(BeginSendLetterEvent event) {
        log.info("Sending letter {}...", event.getLetterId());
        commandGateway.send(new LetterSentCommand(event.getLetterId()));
    }
}
@CommandHandler
public void handle(LetterSentCommand cmd) {
    AggregateLifecycle.apply(new LetterSentEvent(cmd.getLetterId()));
}

这些事件正在通过调度程序发布......通过不同的方式在聚合中运行,@CommandHandler否则会按预期成功运行。

@CommandHandler
public Letter(ScheduleLetterCommand cmd, EventScheduler scheduler) {
    String id = cmd.getLetterId();
    log.info("Received schedule command for letter id {}", id);
    ScheduleToken scheduleToken = scheduler.schedule(Duration.ofSeconds(5), new BeginSendLetterEvent(id));
    AggregateLifecycle.apply(new LetterScheduledEvent(id, scheduleToken));
}

@DisallowReplay注释似乎并没有阻止这一点。另外,我尝试按照此处的说明使处理程序成为Subscribing事件处理器,但要么我做得不对,要么也没有解决问题。

axon:
  axonserver:
    servers: localhost
  eventhandling:
    processors:
      LetterSchedulingHandler:
        mode: subscribing

标签: javaspring-bootaxon

解决方案


您缺少@GoldFLsh,是以下两者之一:

  1. 正确引用EventProcessor你的背后LetterSchedulingHandler
  2. 定义@ProcessingGroup你的LetterSchedulingHandler

您可能已经在事件处理器的参考指南中阅读过,因为任何事件处理组件(LetterSchedulingHandler例如阅读您的)都将与订阅或跟踪事件处理器中的其他事件处理组件组合在一起。

但是,您在此阶段尚未配置任何处理组名称。这意味着LetterSchedulingHandler的处理组将默认为处理程序的包名称。因此,在您的属性文件中,您应该使用包名称而不是将LetterSchedulingHandler其定义为正在订阅。

更清楚的是,在 上添加@ProcessingGroup注释LetterSchedulingHandler,给它一个清晰的名称,然后您可以在属性文件中使用。

最后,您没有正确引用事件处理器的事实是您会看到事件重播的原因。Axon 默认为 a TrackingEventProcessor,它跟踪处理流事件的距离。如果没有持久性单元来存储这些TrackingTokens,它将始终从头开始。


推荐阅读