spring-boot - Axon 框架重试逻辑
问题描述
我正在尝试使用eventProcessingModule
Axon 4.1+ 在 2 JVM 节点 K8 集群上执行重播事件。虽然我设置它会清理事件,但它只从一个节点中拾取它,而另一个节点继续运行,因为它的跟踪事件仍然存在。
我如何才能让它同时在所有 JVM 上被禁用,以便它可以正确重播?然后启用所有这些以继续处理命令。
我尝试通过此代码增加线程,这导致另一个问题,即现有令牌永远不会在 InitialSemgmentsCount 中增加,除非我从数据库中完全删除令牌。
public void config(final EventProcessingConfigurer configurer) {
configurer.registerTrackingEventProcessorConfiguration(c ->
TrackingEventProcessorConfiguration
.forParallelProcessing(2)
.andInitialSegmentsCount(2));
// .andTokenClaimInterval(10L, TimeUnit.SECONDS)); // Event
}
我当前的设置:
# Application.yml
axon:
distributed:
enabled: true
具有以下内容的服务组件:
自动接线EventProcessingConfiguration eventProcessingModule
eventProcessingModule
.eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.shutDown();
trackingEventProcessor.resetTokens();
});
// Thread.sleep() to verify with
eventProcessingModule
.eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.start();
});
示例代码在单个关闭/重置/启动设置中具有上述内容,但我将它们拆分只是为了查看它是如何工作的(相信在重置之后调用 @ResetHandler 因此启动等待,但不完全确定)
SkuProjection 组件用@ResetHandler
void 方法清理要重播的表。
根据当前哪个 JVM 拥有令牌,另一个将返回此错误:
SkuReplayService.startReplay: Failed on exception! Unable to claim token 'query.skuProcessor.SkuProjection[0]'. It is owned by '1@sku-7694bbc6b6-8p958'
我认为该token_entry
表会在其“停止”时变为所有者的“空”,但由于有 2 个 JVM,当前没有令牌的一个将在另一个重放时接管它。在第一个节点停止其处理器后,确认第二个节点正在使用令牌运行。
解决方案
我想我可以在这里给你一些指导。
您正在研究如何跨多个 JVM 将操作委托给一组特定的跟踪事件处理器,对吗?现在,API Axon Framework 为您提供了TrackingEventProcessor
关于TrackingEventProcessor
.
因此,执行、start()
或是 对特定跟踪事件处理器实例执行的调用。shutDown()
processingStatus()
resetTokens()
注意:该
resetTokens()
方法有效地为跟踪事件处理器负责的事件处理程序发出重播。
此外,您会看到Unable to claim token
异常,因为框架要求您TrackingEventProcessor
执行重置是给定处理组的所有令牌的所有者。这样做的原因是它需要调整令牌以ReplayTokens
支持像@ResetHandler
.
如果我是正确的,您现在要求的是:
Axon Framework 是否提供了一种将特定跟踪事件处理器操作委托给处理同一处理组的所有实例的方法?
我想我很遗憾不得不在这里让你失望@sherring,Axon Framework 没有提供委托这种操作的方法。实现这些委托启动/停止/重置操作的最快方法是设置Axon Server,因为标准版(免费)已经为您提供了此功能。
如果出于某种原因,这款免费软件无法使用,这意味着您必须自己为 start/stop/reset/{insert-any-TrackingEventProcessor-operation} 创建这样的委托系统。或者,您可以更多地从运营的角度来处理这个问题。因此,关闭给定 Axon Framework 应用程序的第二个实例,确保只有一个TrackingEventProcessor
您要重置的实例。
希望这可以帮助你@sherring!
推荐阅读
- java - 如何从结果集中打印整列?
- python - Pandas 对返回的 NA 重新采样进行插值
- docker - 'docker network inspect host' 没有 IP 地址
- c - QT Creator CMake 没有 C 编译器路径集
- r - 使用 dplyr 在特定日期内平均观察
- java - Java:如何打印覆盖对象的 toString 方法的 Guava Table
- html - 图片标签不显示图像,读取“源 0x0”
- android - Android - 设置菜单上未正确显示修改后的首选项
- julia - Julia:没有方法匹配重载的 parse()
- c - scanf 使用指向**变量**的指针,而不是**变量本身**