java - Spring Cloud Stream with RabbitMQ binder,如何应用@Transactional?
问题描述
我有一个Spring Cloud Stream应用程序,它使用Rabbit Binder从RabbitMQ接收事件。我的申请可以总结为:
@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
// Transform events and store them in MongoDB using
// spring-boot-data-mongodb-reactive
...
}
问题是它似乎不适@Transactional
用于 Spring Cloud Stream(或者至少这是我的印象),因为如果在写入 MongoDB 时出现异常,则事件似乎已经被 ack:ed 到 RabbitMQ 并且操作不会重试.
鉴于我想实现与spring-amqp@Transactional
使用around a 函数时基本相同的功能:
- 将 Spring Cloud Stream 与 Rabbit Binder 一起使用时,是否必须手动向 RabbitMQ 确认消息?
- 如果是这样,我怎样才能做到这一点?
解决方案
这里有几个问题。
- 确认消息不需要事务
- 基于反应器的
@StreamListener
方法只被调用一次,只是为了设置该方法是没有意义的 - 消息然后流经通量,因此与单个消息有关的任何事情都必须在通量的上下文中完成Flux
。@Transactional
- Spring Transactions 绑定到线程 - Reactor 是非阻塞的;该消息将在第一次切换时得到确认。
是的,您需要使用手动确认;大概是mongodb存储操作的结果。您可能需要使用Flux<Message<Event>>
,以便您可以访问频道和交付标签标题。
推荐阅读
- java - Dialogflow V2,Java API 不会显示基本卡片
- javascript - How to check for Map object keys in Jest
- tcp - TCP 设置中的 PUSH 标志,用于在大 tx 中间发送价值 1 MSS 的数据
- timer - STM32F103 定时器通道模式
- java - 如何使用 Spring AOP 和 WebFlux 获取从 joinPoint.proceed() 返回的对象
- node.js - 我尝试像 Oracle 在 linux ubuntu 上所说的那样安装 ojet-cli globlay,但找不到 ojet 命令
- clips - 剪辑:匹配(或不匹配空字符串)
- c++ - 来自文本输入的二维数组,中间有空格
- sql-server-2012 - 如何获取当前记录上所有字段的 DATALENGTH 的最大值
- python - Python将列表分配给数字