首页 > 解决方案 > 从 Kafka 消费时 JPA 事务成功后的确认消息

问题描述

在 Quarkus 应用程序中,我想使用 Kafka 消息并使用实体管理器将其信息保存在数据库中。

这是我到目前为止得到的:

@ApplicationScoped
public class ClientEventConsumer {

    @Inject ClientRepository repository;

    private final BlockingQueue<Message<ClientEvent>> messages = new LinkedBlockingQueue<>();

    void startup(@Observes StartupEvent startupEvent) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        executor.scheduleAtFixedRate(() -> {
            if (messages.size() > 0) {
                try {
                    Message<ClientEvent> message = messages.take();
                    ClientEvent clientEvent = message.getPayload();
                    ClientEntity clientEntity = new ClientEntity();
                    clientEntity.setId(clientEvent.getId());
                    clientEntity.setName(clientEvent.getName());
                    repository.merge(clientEntity);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }, 1000, 500, TimeUnit.MILLISECONDS);
    }

    @Incoming("qualificationCheck")
    public CompletionStage<Void> consume(Message<ClientEvent> msg) {
        messages.add(msg);
        return msg.ack();
    }
}

但是使用这种方法,消息会在记录实际保存在数据库中之前得到确认。如果 JPA 事务成功,有没有办法只确认消息?

标签: quarkussmallrye-reactive-messaging

解决方案


推荐阅读