首页 > 解决方案 > Axon 框架重试逻辑

问题描述

我正在尝试使用eventProcessingModuleAxon 4.1+ 在 2 JVM 节点 K8 集群上执行重播事件。虽然我设置它会清理事件,但它只从一个节点中拾取它,而另一个节点继续运行,因为它的跟踪事件仍然存在。

我如何才能让它同时在所有 JVM 上被禁用,以便它可以正确重播?然后启用所有这些以继续处理命令。

我尝试通过此代码增加线程,这导致另一个问题,即现有令牌永远不会在 InitialSemgmentsCount 中增加,除非我从数据库中完全删除令牌。

    public void config(final EventProcessingConfigurer configurer) {
        configurer.registerTrackingEventProcessorConfiguration(c ->
                TrackingEventProcessorConfiguration
                        .forParallelProcessing(2)
                        .andInitialSegmentsCount(2));
        // .andTokenClaimInterval(10L, TimeUnit.SECONDS)); // Event
    }

我当前的设置:

# Application.yml
axon:
  distributed:
    enabled: true

具有以下内容的服务组件:

自动接线EventProcessingConfiguration eventProcessingModule

eventProcessingModule
        .eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
        .ifPresent(trackingEventProcessor -> {
            trackingEventProcessor.shutDown();
            trackingEventProcessor.resetTokens();
        });

// Thread.sleep() to verify with

eventProcessingModule
        .eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
        .ifPresent(trackingEventProcessor -> {
            trackingEventProcessor.start();
        });

示例代码在单个关闭/重置/启动设置中具有上述内容,但我将它们拆分只是为了查看它是如何工作的(相信在重置之后调用 @ResetHandler 因此启动等待,但不完全确定)

SkuProjection 组件用@ResetHandlervoid 方法清理要重播的表。

根据当前哪个 JVM 拥有令牌,另一个将返回此错误: SkuReplayService.startReplay: Failed on exception! Unable to claim token 'query.skuProcessor.SkuProjection[0]'. It is owned by '1@sku-7694bbc6b6-8p958'

我认为该token_entry表会在其“停止”时变为所有者的“空”,但由于有 2 个 JVM,当前没有令牌的一个将在另一个重放时接管它。在第一个节点停止其处理器后,确认第二个节点正在使用令牌运行。

标签: spring-bootaxon

解决方案


我想我可以在这里给你一些指导。

您正在研究如何跨多个 JVM 将操作委托给一组特定的跟踪事件处理器,对吗?现在,API Axon Framework 为您提供了TrackingEventProcessor关于TrackingEventProcessor.

因此,执行、start()或是 对特定跟踪事件处理器实例执行的调用。shutDown()processingStatus()resetTokens()

注意:该resetTokens()方法有效地为跟踪事件处理器负责的事件处理程序发出重播。

此外,您会看到Unable to claim token异常,因为框架要求您TrackingEventProcessor执行重置是给定处理组的所有令牌的所有者。这样做的原因是它需要调整令牌以ReplayTokens支持像@ResetHandler.

如果我是正确的,您现在要求的是:

Axon Framework 是否提供了一种将特定跟踪事件处理器操作委托给处理同一处理组的所有实例的方法?

我想我很遗憾不得不在这里让你失望@sherring,Axon Framework 没有提供委托这种操作的方法。实现这些委托启动/停止/重置操作的最快方法是设置Axon Server,因为标准版(免费)已经为您提供了此功能。

如果出于某种原因,这款免费软件无法使用,这意味着您必须自己为 start/stop/reset/{insert-any-TrackingEventProcessor-operation} 创建这样的委托系统。或者,您可以更多地从运营的角度来处理这个问题。因此,关闭给定 Axon Framework 应用程序的第二个实例,确保只有一个TrackingEventProcessor您要重置的实例。

希望这可以帮助你@sherring!


推荐阅读