首页 > 解决方案 > Spring Cloud Stream with RabbitMQ binder,如何应用@Transactional?

问题描述

我有一个Spring Cloud Stream应用程序,它使用Rabbit Binder从RabbitMQ接收事件。我的申请可以总结为:

@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
       // Transform events and store them in MongoDB using 
       // spring-boot-data-mongodb-reactive
       ...
}

问题是它似乎不适@Transactional用于 Spring Cloud Stream(或者至少这是我的印象),因为如果在写入 MongoDB 时出现异常,则事件似乎已经被 ack:ed 到 RabbitMQ 并且操作不会重试.

鉴于我想实现与spring-amqp@Transactional使用around a 函数时基本相同的功能:

  1. 将 Spring Cloud Stream 与 Rabbit Binder 一起使用时,是否必须手动向 RabbitMQ 确认消息?
  2. 如果是这样,我怎样才能做到这一点?

标签: javaspring-bootrabbitmqspring-data-mongodbspring-cloud-stream

解决方案


这里有几个问题。

  1. 确认消息不需要事务
  2. 基于反应器的@StreamListener方法只被调用一次,只是为了设置该方法是没有意义的 - 消息然后流经通量,因此与单个消息有关的任何事情都必须在通量的上下文中完成Flux@Transactional
  3. Spring Transactions 绑定到线程 - Reactor 是非阻塞的;该消息将在第一次切换时得到确认。

是的,您需要使用手动确认;大概是mongodb存储操作的结果。您可能需要使用Flux<Message<Event>>,以便您可以访问频道和交付标签标题。


推荐阅读