spring-boot - 为什么 Axon 从第二个请求开始抛出 ConcurrencyException?
问题描述
我正在用 Axon 做 POC。我发现 axon 能够处理我的第一个 POST 请求,并且对于所有后续 POST 请求,我得到以下异常。对于每个 Create 请求,我都会创建唯一标识符 IdentifierFactory.getInstance().generateIdentifier(),因此,理想情况下,这应该可以工作,我看到这也从断点发生了变化,但索引 id 变得相同。
有人可以在这里找到丢失的部分。
org.hsqldb.HsqlException: integrity constraint violation: unique constraint or index violation; UK8S1F994P4LA2IPB13ME2XQM1W table: DOMAIN_EVENT_ENTRY
org.axonframework.modelling.command.ConcurrencyException(An event for aggregate [0] at
sequence [0] was already inserted)
发布请求:
请求1:这个成功
curl --location --request POST 'http://localhost:8080/raise/issues' \
--header 'Content-Type: application/json' \
--data-raw '{"description":"Demo issue1","type":"DEMO1"}'
请求 2:从这个开始它失败了
curl --location --request POST 'http://localhost:8080/raise/issues' \
--header 'Content-Type: application/json' \
--data-raw '{"description":"Demo issue2","type":"DEMO2"}'
控制器:
public class IssueTracker {
@Inject
private IssueTrackerService issueTrackerService;
@GetMapping("/issues")
public List<Issue> getAllIssues() {
return issueTrackerService.getAllIssues();
}
@PostMapping(value = "/raise/issues", consumes = "application/json")
public CompletableFuture<IssueCommand> raiseIssue(@RequestBody IssueView issueView) {
return issueTrackerService.raiseIssue(issueView);
}
}
服务:
package com.axon.axondemo.service;
import com.axon.axondemo.dao.Issue;
import com.axon.axondemo.dto.IssueCommand;
import com.axon.axondemo.repository.IssueTRepository;
import com.axon.axondemo.view.IssueView;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.common.IdentifierFactory;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Service
public class IssueTrackerService {
private final IssueTRepository issueTRepository;
private final CommandGateway commandGateway;
public IssueTrackerService(IssueTRepository issueTRepository, CommandGateway commandGateway) {
this.issueTRepository = issueTRepository;
this.commandGateway = commandGateway;
}
public CompletableFuture<IssueCommand> raiseIssue(IssueView issueView) {
return commandGateway.send(new IssueCommand(IdentifierFactory.getInstance().generateIdentifier(), issueView.getDescription(), issueView.getType()));
}
public List<Issue> getAllIssues() {
return issueTRepository.findAll();
}
}
实体:
import com.axon.axondemo.dto.IssueCommand;
import com.axon.axondemo.events.IssueEvent;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
@Aggregate
@Entity
public class Issue {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
@AggregateIdentifier
private long id;
private String description;
private String type;
public Issue() {}
public Issue(String description, String type) {
this.description = description;
this.type = type;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@CommandHandler
public Issue(IssueCommand issueCommand) {
apply(new IssueEvent(issueCommand.getAggregateRefno(), issueCommand.getDescription(), issueCommand.getType()));
}
@EventSourcingHandler
public void on(IssueEvent issueEvent) {
this.description = issueEvent.getDescription();
this.type = issueEvent.getType();
}
}
命令:
package com.axon.axondemo.dto;
import org.axonframework.modelling.command.TargetAggregateIdentifier;
public class IssueCommand {
private String description;
private String type;
@TargetAggregateIdentifier
private String aggregateRefno;
public IssueCommand(String aggregateRefno, String description, String type) {
this.aggregateRefno = aggregateRefno;
this.description = description;
this.type = type;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getAggregateRefno() {
return aggregateRefno;
}
public void setAggregateRefno(String aggregateRefno) {
this.aggregateRefno = aggregateRefno;
}
}
事件:
package com.axon.axondemo.events;
public class IssueEvent {
private String aggregateRefno;
private String description;
private String type;
public IssueEvent() {}
public IssueEvent(String aggregateRefno, String description, String type) {
this.description = description;
this.type = type;
this.aggregateRefno = aggregateRefno;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getAggregateRefno() {
return aggregateRefno;
}
public void setAggregateRefno(String aggregateRefno) {
this.aggregateRefno = aggregateRefno;
}
}
查询/处理程序:
package com.axon.axondemo.handler;
import com.axon.axondemo.events.IssueEvent;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.stereotype.Component;
@Component
public class IssueEventHandler {
@EventHandler
public void on(IssueEvent issueEvent) {
System.out.println("*************");
System.out.println("*************");
System.out.println("Issue event handled!!!!");
System.out.println(issueEvent.getDescription());
System.out.println("*************");
System.out.println("*************");
}
}
存储库:
package com.axon.axondemo.repository;
import com.axon.axondemo.dao.Issue;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface IssueTRepository extends JpaRepository<Issue, Long> {
}
解决方案
@Flaxel 他/她的论点需要注意:
我不会将实体和聚合实现为公共对象。
我要补充一点,你在那里做的事情绝对没有错。主要区别在于,如果您将聚合按原样作为存储实体,则您不会执行事件溯源。你有一个选择,从 Axon 的参考指南中你可以选择"State-Stored Aggregate"。但是,您的聚合片段确实使用了带@EventSourcingHandler
注释的方法,似乎表明您确实希望对所述聚合使用事件溯源。因此,值得在您的聚合设计中采用状态存储或事件源方法来保持清晰。但是,这并不能解决您遇到的问题,因此让我们进一步关注这一点。
您收到的异常正在发送,因为您的应用程序尝试将同一聚合的事件存储在同一位置。通常这表明您的服务的两个不同实例正在加载相同的聚合并对其执行操作,这是不可取的,因为它引入了并发异常。因此,为什么 Axon 会抛出ConcurrencyException
.
正如您从消息中看到的,唯一性约束是由聚合标识符和序列号构建的。后者是描述聚合流中事件位置的增量数字。您无法立即控制此值。你控制的是聚合标识符。
目前,您的注释字段与注释字段@AggregateIdentifier
相同@Id
。这又没有错。我不会做的就是把它变成一个long
. 使用long
(无论是否生成)都会使您经常看到并发异常,尤其是当您开始向外扩展时。假设您有四个正在运行的应用程序实例,所有实例都同时处理命令。您是否会使用分布式序列生成器,以便聚合标识符都排成一行?可行,是的,但它为此引入了相当多的复杂性。
我建议使用常规随机UUID
作为@AggregateIdentifier
注释字段。在这种情况下,您更确定(实际上)永远不会遇到重复的 id。
尽管如此,这并没有回答我为什么您发出的第二个命令使得您的序列生成器重用 ID0
而不是调整它。我所知道的是,它不再是 Axon Framework 的事情了,因为这是由于使用了@GeneratedValue
注释而发生的。
@flaxel 引用的 Baeldung 页面可以证明是一个不错的起点,因为它已由 AxonIQ 团队自己更新。最重要的是,您可以查看大量快速入门视频。最后,如果您发现自己被困在未来,参加快速通道轴突训练(仅 2 小时)也可能会有所帮助。
推荐阅读
- kdb - 字符串前的“-1”有什么作用?
- vue.js - vuejs3 模块解析在服务器启动时失败
- docker - 使用 Prometheus 监控 bash 命令或 shell 脚本的结果
- docker - GnuTLS:TLS 连接未正确终止。无法建立 SSL 连接
- arrays - 我对这些情况的时间复杂度是正确的?
- modelica - 模拟后如何自动删除 Dymolas 构建文件?
- kotlin - onMapReady 函数似乎没有在 android 应用程序上调用
- r - 如何根据曝光调整模型中的不同协变量?
- r - 将日期列合并为一个
- next.js - 在 WriteStream 实例上发出“错误”事件