首页 > 解决方案 > 将事件发布到 Saga 时没有具有实际事务的 EntityManager

问题描述

我正在使用“axon 4.0.3 + Spring Boot 2 + Spring Data (PostgreSQL)”默认配置。

将事件发布到 EventStore 并等待它被 @SagaEventHandler 捕获后,我收到以下异常:

javax.persistence.TransactionRequiredException:没有可用于当前线程的实际事务的 EntityManager - 无法可靠地处理 org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:292) 处的“持久”调用(SharedEntityManagerCreator.java:292) ~[spring-orm- 5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.sun.proxy.$Proxy104.persist(Unknown Source) ~[na:na] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept( ForEachOps.java:184) ~[na:1.8.0_191] at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_191] at java.util.ArrayList$ArrayListSpliterator。 forEachRemaining(ArrayList.java:1382) ~[na:1.8.0_191] 在 java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[na:1.8.0_191] 在 java.util.stream。AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_191] at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[na:1.8.0_191] at java.util。 stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:1.8.0_191] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_191] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[na:1.8.0_191] at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:276) ~[axon-eventsourcing -4.0.3.jar:4.0.3] 在 org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98) ~[axon-eventsourcing-4.0.3.jar:4.0.3]471)~[na:1.8.0_191] 在 java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)~[na:1.8.0_191] 在 java.util.stream.ForEachOps$ForEachOp$OfRef。 evaluateSequential(ForEachOps.java:174) ~[na:1.8.0_191] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_191] at java.util.stream.ReferencePipeline。 forEach(ReferencePipeline.java:418) ~[na:1.8.0_191] at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:276) ~[axon-eventsourcing-4.0.3.jar:4.0 .3] 在 org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98) ~[axon-eventsourcing-4.0.3.jar:4.0.3]471)~[na:1.8.0_191] 在 java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)~[na:1.8.0_191] 在 java.util.stream.ForEachOps$ForEachOp$OfRef。 evaluateSequential(ForEachOps.java:174) ~[na:1.8.0_191] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_191] at java.util.stream.ReferencePipeline。 forEach(ReferencePipeline.java:418) ~[na:1.8.0_191] at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:276) ~[axon-eventsourcing-4.0.3.jar:4.0 .3] 在 org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98) ~[axon-eventsourcing-4.0.3.jar:4.0.3]1.8.0_191] 在 java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:1.8.0_191] 在 java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_191] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[na:1.8.0_191] at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine. java:276) ~[axon-eventsourcing-4.0.3.jar:4.0.3] at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98) ~[axon-eventsourcing-4.0.3.jar :4.0.3]1.8.0_191] 在 java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:1.8.0_191] 在 java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_191] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[na:1.8.0_191] at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine. java:276) ~[axon-eventsourcing-4.0.3.jar:4.0.3] at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98) ~[axon-eventsourcing-4.0.3.jar :4.0.3]0_191] 在 java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[na:1.8.0_191] 在 org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:276) ~[ axon-eventsourcing-4.0.3.jar:4.0.3] 在 org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98) ~[axon-eventsourcing-4.0.3.jar:4.0.3]0_191] 在 java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[na:1.8.0_191] 在 org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:276) ~[ axon-eventsourcing-4.0.3.jar:4.0.3] 在 org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98) ~[axon-eventsourcing-4.0.3.jar:4.0.3]

EventStore 需要什么额外的配置来处理这种情况?

PS。在方法上添加@Transactional 解决了这个问题,但我不明白为什么这是必要的。

最小代码示例(以下端点 127.0.0.1:8080/1 有效,但另一个端点 127.0.0.1:8080/1 无效):

@SpringBootApplication
class TestAxonApplication

class UserId(val userId: String = IdentifierFactory.getInstance().generateIdentifier()) : Serializable

class TestCommand(@TargetAggregateIdentifier val userId: UserId)

class TestedEvent(val userId: UserId)

fun main(args: Array<String>) {
    runApplication<TestAxonApplication>(*args)
}

@RestController
@RequestMapping
class Controller(var commandGateway: CommandGateway, var eventStore: EventStore) {

    @GetMapping("/1")
    fun done(): UserId? {
        return commandGateway.sendAndWait<UserId>(TestCommand(UserId()))
    }

    @GetMapping("/2")
    fun failure() {
        eventStore.publish(
                GenericEventMessage.asEventMessage<Void>(
                        TestedEvent(UserId())
                )
        )
    }

}

@Aggregate
class User() {

    @AggregateIdentifier
    private lateinit var userId: UserId

    @CommandHandler
    constructor(cmd: TestCommand) : this() {
        AggregateLifecycle.apply(TestedEvent(cmd.userId))
    }

    @EventHandler
    fun on(event: TestedEvent) {
        this.userId = event.userId
    }

}

@Saga
@ProcessingGroup("mySaga")
class MySaga {

    @StartSaga
    @SagaEventHandler(associationProperty = "userId")
    fun start(event: TestedEvent) {
        println("DONE ${event.userId.userId}")
    }

}

标签: spring-bootjpaevent-sourcingsagaaxon

解决方案


调用之间的区别在于,一个通过命令总线,而另一个跳过该调用并直接发布到事件总线。默认情况下,TransactionManager在命令总线上配置 a。但是,在事件总线上并非如此。

这意味着您正在发布事件而没有激活事务。Hibernate 不喜欢这样。

解决方案是放置@Transactional在您的端点上,以确保在存储事件时事务处于活动状态。


推荐阅读