首页 > 解决方案 > 从不在 Axon 中工作的事件处理程序发布新事件

问题描述

我有两个单独的应用程序在同一台服务器上运行。

  1. 用户管理
  2. 钱包管理

我已经实现了从用户管理到钱包管理的事件溯源。它工作正常。

但是,当我从钱包管理应用程序中存在的事件处理程序发布新事件时,我将收到以下错误消息日志。

这是我的日志详细信息

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.peaas.ngapblueprintdemo.wallet.config.AxonConfiguration$1.onMessage(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [772c7f69-534e-4ee2-b198-5b4d2edd3497] at sequence [0] could not be persisted
    at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.handlePersistenceException(AbstractEventStorageEngine.java:112)
    at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:223)
    at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:85)
    at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:64)
    at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:210)
    at org.axonframework.eventhandling.AbstractEventBus.lambda$null$4(AbstractEventBus.java:145)
    at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:68)
    at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:131)
    at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:891)
    at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:131)
    at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:214)
    at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83)
    at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71)
    at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92)
    at org.axonframework.eventhandling.AbstractEventProcessor.process(AbstractEventProcessor.java:116)
    at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:142)
    at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:32)
    at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:135)
    at org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource.lambda$onMessage$1(SpringAMQPMessageSource.java:90)
    at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890)
    at org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource.onMessage(SpringAMQPMessageSource.java:90)
    at com.peaas.ngapblueprintdemo.wallet.config.AxonConfiguration$1.onMessage(AxonConfiguration.java:67)
    at sun.reflect.GeneratedMethodAccessor273.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182)
    ... 10 common frames omitted
Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'persist' call
    at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:289)
    at com.sun.proxy.$Proxy224.persist(Unknown Source)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:218)
    ... 37 common frames omitted

事件处理程序类

package com.peaas.ngapblueprintdemo.wallet.eventHandlers;

import java.util.UUID;

import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.peaas.ngapblueprintdemo.events.CreateUserProfileEvent;
import com.peaas.ngapblueprintdemo.events.WalletCreatedEvent;
import com.peaas.ngapblueprintdemo.wallet.commands.WalletCreatedCommand;
import com.peaas.ngapblueprintdemo.wallet.domain.Wallet;
import com.peaas.ngapblueprintdemo.wallet.repository.WalletRepository;

@ProcessingGroup("amqpEvents")
@Component  
public class UserEventHandler {

    @Autowired
    private WalletRepository walletRepository;

    @Autowired
    private transient CommandGateway commandGatway;

    @EventHandler
    public void onCreateUserProfile(CreateUserProfileEvent event) {
        System.out.println("--- Wallet Event Handler ---");
        Wallet wallet = new Wallet();
        wallet.setAmount(0d);
        wallet.setUserid(event.getUserId());
        wallet = walletRepository.save(wallet);
        String walletID = UUID.randomUUID().toString();
        WalletCreatedCommand command = new WalletCreatedCommand(wallet.getId(),walletID,wallet.getAmount(),wallet.getUserid());
        command.setId(wallet.getId());
        commandGatway.send(command);

    }

    @EventHandler
    public void onCreateWalletEvent(WalletCreatedEvent event) {
        System.out.println("--- Wallet Created Successfully ---");
        System.out.println(event);
    }
}

聚合类

package com.peaas.ngapblueprintdemo.wallet.aggregate;

import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;

import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateLifecycle;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.spring.stereotype.Aggregate;

import com.peaas.ngapblueprintdemo.events.WalletCreatedEvent;
import com.peaas.ngapblueprintdemo.wallet.commands.WalletCreatedCommand;

@Aggregate
public class WalletAggregate {

    private Long id;
    @AggregateIdentifier
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private String walletId;
    private Double amount;
    private Long userId;

    @CommandHandler
    public WalletAggregate(WalletCreatedCommand command){
        System.out.println("--- Command Handler Start : WalletCreatedCommand  ---");
        AggregateLifecycle.apply(new WalletCreatedEvent(command.getUserId(),command.getWalletId(),command.getAmount(),command.getUserId()));
        System.out.println("--- Command Handler End : WalletCreatedCommand ---");
    }

    @EventSourcingHandler
    public void handle(WalletCreatedEvent event) {
        System.out.println("--- Event Sourcing Handler Start : WalletCreatedEvent  ---");
        this.id = event.getId();
        this.walletId = event.getWalletId();
        this.amount = event.getAmount();
        this.userId = event.getUserId();
        System.out.println("--- Event Sourcing Handler End : WalletCreatedEvent  ---");
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    @Override
    public String toString() {
        return "WalletAggregate [id=" + id + ", amount=" + amount + ", userId=" + userId + "]";
    }

}

命令类

package com.peaas.ngapblueprintdemo.wallet.commands;

import org.axonframework.commandhandling.TargetAggregateIdentifier;

public class WalletCreatedCommand {
    private Long id;
    @TargetAggregateIdentifier
    private String walletId;
    private Double amount;
    private Long userId;

    public WalletCreatedCommand() {

    }

    public WalletCreatedCommand(Long id,String walletId, Double amount, Long userId) {
        super();
        this.id = id;
        this.walletId = walletId;
        this.amount = amount;
        this.userId = userId;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getWalletId() {
        return walletId;
    }

    public void setWalletId(String walletId) {
        this.walletId = walletId;
    }

    @Override
    public String toString() {
        return "WalletCreatedCommand [id=" + id + ", walletId=" + walletId + ", amount=" + amount + ", userId=" + userId
                + "]";
    }

}

事件类

package com.peaas.ngapblueprintdemo.events;

public class WalletCreatedEvent {
    private Long id;
    private String walletId;
    private Double amount;
    private Long userId;

    public WalletCreatedEvent() {

    }

    public WalletCreatedEvent(Long id,String walletId, Double amount, Long userId) {
        super();
        this.id = id;
        this.walletId = walletId;
        this.amount = amount;
        this.userId = userId;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getWalletId() {
        return walletId;
    }

    public void setWalletId(String walletId) {
        this.walletId = walletId;
    }

    @Override
    public String toString() {
        return "WalletCreatedEvent [id=" + id + ", walletId=" + walletId + ", amount=" + amount + ", userId=" + userId
                + "]";
    }

}

谢谢

标签: javaspringhibernateaxon

解决方案


我的预感是你错过了TransactionManagingInterceptor你的CommandBus,这将导致命令处理,因此事件发布不会在事务中执行。

您可以通过执行以下TransactionManagingInterceptor操作向您的消息处理程序拦截器注册:CommandBus

public CommandBus commandBus(TransactionManager transactionManager) {
    // While you're at it, you could also instantiate an AsynchronousCommandBus if you'd want.
    CommandBus commandBus = new SimpleCommandBus();
    commandBus.registerHandlerInterceptor(new TransactionManagingInterceptor<>(transactionManager));
    return commandBus;
}

推荐阅读